summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_test.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp134
1 files changed, 134 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 75807b0232f..9b807f7fd23 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -3286,6 +3286,140 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineRunsUnsplitOnMongoSIfSplit
}
} // namespace mustRunOnMongoS
+
+namespace DeferredSort {
+
+// Like a DocumentSourceMock, but has a deferrable merge sort.
+class DocumentSourceDeferredMergeSort : public DocumentSourceMock {
+public:
+ DocumentSourceDeferredMergeSort(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {}
+
+ static boost::intrusive_ptr<DocumentSourceDeferredMergeSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceDeferredMergeSort(expCtx);
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ DistributedPlanLogic logic;
+
+ logic.mergingStage = nullptr;
+ logic.shardsStage = nullptr;
+ logic.mergeSortPattern = BSON("a" << 1);
+ logic.needsSplit = false;
+
+ return logic;
+ }
+};
+
+class DocumentSourceCanSwapWithSort : public DocumentSourceMock {
+public:
+ DocumentSourceCanSwapWithSort(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {}
+
+ static boost::intrusive_ptr<DocumentSourceCanSwapWithSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceCanSwapWithSort(expCtx);
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ return boost::none;
+ }
+
+ // This is just to test splitting logic, doGetNext should not be called.
+ GetNextResult doGetNext() override {
+ MONGO_UNREACHABLE;
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kAllowed,
+ UnionRequirement::kAllowed,
+ ChangeStreamRequirement::kDenylist);
+ constraints.preservesOrderAndMetadata = true;
+
+ return constraints;
+ }
+};
+
+using PipelineDeferredMergeSortTest = AggregationContextFixture;
+using HostTypeRequirement = StageConstraints::HostTypeRequirement;
+
+TEST_F(PipelineDeferredMergeSortTest, StageWithDeferredSortDoesNotSplit) {
+ auto expCtx = getExpCtx();
+
+ expCtx->inMongos = true;
+
+ auto mock = DocumentSourceDeferredMergeSort::create(expCtx);
+ auto swappable = DocumentSourceCanSwapWithSort::create(expCtx);
+ auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
+ auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx);
+
+ auto pipeline = Pipeline::create({mock, swappable, split, runOnMongoS}, expCtx);
+
+ auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Verify that we've split the pipeline at the SplitPipeline stage, not on the deferred sort.
+ ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2);
+ ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 2);
+
+ // Verify the sort is correct.
+ ASSERT(splitPipeline.shardCursorsSortSpec);
+ ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1));
+}
+
+TEST_F(PipelineDeferredMergeSortTest, EarliestSortIsSelectedIfDeferred) {
+ auto expCtx = getExpCtx();
+
+ expCtx->inMongos = true;
+
+ auto mock = DocumentSourceDeferredMergeSort::create(expCtx);
+ auto swappable = DocumentSourceCanSwapWithSort::create(expCtx);
+ auto sort = DocumentSourceSort::create(expCtx, fromjson("{NO: 1}"));
+ auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
+ auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx);
+
+ auto pipeline = Pipeline::create({mock, swappable, sort, split, runOnMongoS}, expCtx);
+
+ auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Verify that we've split the pipeline at the non-deferred sort.
+ ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2);
+ ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3);
+
+ // Verify the sort is correct.
+ ASSERT(splitPipeline.shardCursorsSortSpec);
+ ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1));
+}
+
+TEST_F(PipelineDeferredMergeSortTest, StageThatCantSwapGoesToMergingHalf) {
+ auto expCtx = getExpCtx();
+
+ expCtx->inMongos = true;
+
+ auto mock = DocumentSourceDeferredMergeSort::create(expCtx);
+ auto match = DocumentSourceMatch::create(fromjson("{a: 5}"), expCtx);
+ auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
+ auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx);
+
+ auto pipeline = Pipeline::create({mock, match, split, runOnMongoS}, expCtx);
+
+ auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Verify that we've split the pipeline at the stage that can't be swapped.
+ ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 1);
+ ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3);
+
+ // Verify the sort is correct.
+ ASSERT(splitPipeline.shardCursorsSortSpec);
+ ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1));
+}
+} // namespace DeferredSort
} // namespace Sharded
} // namespace Optimizations