diff options
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sample.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stage_constraints.h | 7 |
7 files changed, 182 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index d163c88761e..5117fd9c050 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -267,9 +267,15 @@ public: boost::intrusive_ptr<DocumentSource> mergingStage = nullptr; // If set, each document is expected to have sort key metadata which will be serialized in - // the '$sortKey' field. 'inputSortPattern' will then be used to describe which fields are + // the '$sortKey' field. 'mergeSortPattern' will then be used to describe which fields are // ascending and which fields are descending when merging the streams together. - boost::optional<BSONObj> inputSortPattern = boost::none; + boost::optional<BSONObj> mergeSortPattern = boost::none; + + // If mergeSortPattern is specified and needsSplit is false, the split point will be + // deferred to the next stage that would split the pipeline. The sortPattern will be taken + // into account at that split point. Should be true if a stage specifies 'shardsStage' or + // 'mergingStage'. Does not mean anything if the sort pattern is not set. + bool needsSplit = true; }; virtual ~DocumentSource() {} diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index de6252449eb..e9e620d192a 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -604,7 +604,7 @@ protected: ASSERT(distributedPlanLogic); ASSERT(distributedPlanLogic->mergingStage); ASSERT_NOT_EQUALS(group(), distributedPlanLogic->mergingStage); - ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->inputSortPattern)); + ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->mergeSortPattern)); return distributedPlanLogic->mergingStage; } void checkResultSet(const intrusive_ptr<DocumentSource>& sink) { diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 6f3f95d16f1..1b5cb79ec7f 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -136,7 +136,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::dist // just used to communicate ascending/descending information. A pattern like {$meta: "randVal"} // is neither ascending nor descending, and so will not be useful when constructing the merging // logic. - logic.inputSortPattern = BSON("$rand" << -1); + logic.mergeSortPattern = BSON("$rand" << -1); return logic; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index efbc3f3fe42..044d5340578 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -254,7 +254,7 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() { DistributedPlanLogic split; split.shardsStage = this; - split.inputSortPattern = _sortExecutor->sortPattern() + split.mergeSortPattern = _sortExecutor->sortPattern() .serialize(SortPattern::SortKeySerialization::kForSortKeyMerging) .toBson(); if (auto limit = getLimit()) { diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 75807b0232f..9b807f7fd23 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -3286,6 +3286,140 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineRunsUnsplitOnMongoSIfSplit } } // namespace mustRunOnMongoS + +namespace DeferredSort { + +// Like a DocumentSourceMock, but has a deferrable merge sort. +class DocumentSourceDeferredMergeSort : public DocumentSourceMock { +public: + DocumentSourceDeferredMergeSort(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) {} + + static boost::intrusive_ptr<DocumentSourceDeferredMergeSort> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceDeferredMergeSort(expCtx); + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + DistributedPlanLogic logic; + + logic.mergingStage = nullptr; + logic.shardsStage = nullptr; + logic.mergeSortPattern = BSON("a" << 1); + logic.needsSplit = false; + + return logic; + } +}; + +class DocumentSourceCanSwapWithSort : public DocumentSourceMock { +public: + DocumentSourceCanSwapWithSort(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) {} + + static boost::intrusive_ptr<DocumentSourceCanSwapWithSort> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceCanSwapWithSort(expCtx); + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + return boost::none; + } + + // This is just to test splitting logic, doGetNext should not be called. + GetNextResult doGetNext() override { + MONGO_UNREACHABLE; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const override { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kAllowed, + ChangeStreamRequirement::kDenylist); + constraints.preservesOrderAndMetadata = true; + + return constraints; + } +}; + +using PipelineDeferredMergeSortTest = AggregationContextFixture; +using HostTypeRequirement = StageConstraints::HostTypeRequirement; + +TEST_F(PipelineDeferredMergeSortTest, StageWithDeferredSortDoesNotSplit) { + auto expCtx = getExpCtx(); + + expCtx->inMongos = true; + + auto mock = DocumentSourceDeferredMergeSort::create(expCtx); + auto swappable = DocumentSourceCanSwapWithSort::create(expCtx); + auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); + auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx); + + auto pipeline = Pipeline::create({mock, swappable, split, runOnMongoS}, expCtx); + + auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Verify that we've split the pipeline at the SplitPipeline stage, not on the deferred sort. + ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2); + ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 2); + + // Verify the sort is correct. + ASSERT(splitPipeline.shardCursorsSortSpec); + ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1)); +} + +TEST_F(PipelineDeferredMergeSortTest, EarliestSortIsSelectedIfDeferred) { + auto expCtx = getExpCtx(); + + expCtx->inMongos = true; + + auto mock = DocumentSourceDeferredMergeSort::create(expCtx); + auto swappable = DocumentSourceCanSwapWithSort::create(expCtx); + auto sort = DocumentSourceSort::create(expCtx, fromjson("{NO: 1}")); + auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); + auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx); + + auto pipeline = Pipeline::create({mock, swappable, sort, split, runOnMongoS}, expCtx); + + auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Verify that we've split the pipeline at the non-deferred sort. + ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2); + ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3); + + // Verify the sort is correct. + ASSERT(splitPipeline.shardCursorsSortSpec); + ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1)); +} + +TEST_F(PipelineDeferredMergeSortTest, StageThatCantSwapGoesToMergingHalf) { + auto expCtx = getExpCtx(); + + expCtx->inMongos = true; + + auto mock = DocumentSourceDeferredMergeSort::create(expCtx); + auto match = DocumentSourceMatch::create(fromjson("{a: 5}"), expCtx); + auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); + auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx); + + auto pipeline = Pipeline::create({mock, match, split, runOnMongoS}, expCtx); + + auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Verify that we've split the pipeline at the stage that can't be swapped. + ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 1); + ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3); + + // Verify the sort is correct. + ASSERT(splitPipeline.shardCursorsSortSpec); + ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1)); +} +} // namespace DeferredSort } // namespace Sharded } // namespace Optimizations diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 957cfc6de40..505b1bca450 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -262,9 +262,18 @@ std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expC * Returns the sort specification if the input streams are sorted, and false otherwise. */ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) { + // Stages can specify that a merge sort must be performed sometime during the pipeline. Keep + // track of it until we hit the actual split point. + boost::optional<BSONObj> mergeSort = boost::none; while (!mergePipe->getSources().empty()) { boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); + // If we've deferred a sort, only push it past stages that don't change the sort order. + if (mergeSort && !current->constraints().preservesOrderAndMetadata) { + // This will break the merge sort, keep it in the merging half of the pipeline. + mergePipe->addInitialSource(std::move(current)); + return mergeSort; + } // Check if this source is splittable. auto distributedPlanLogic = current->distributedPlanLogic(); if (!distributedPlanLogic) { @@ -273,18 +282,38 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi continue; } + // If we got a plan logic with a sort that doesn't require a split, save it and keep going. + if (distributedPlanLogic->mergeSortPattern && !distributedPlanLogic->needsSplit) { + tassert(6441000, + "Cannot specify shardsStage or mergingStage and not require that the pipeline " + "be split", + !distributedPlanLogic->shardsStage && !distributedPlanLogic->mergingStage); + shardPipe->push_back(current); + mergeSort = distributedPlanLogic->mergeSortPattern; + continue; + } + // A source may not simultaneously be present on both sides of the split. invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); + tassert( + 6441001, + "Cannot specify shardsStage or mergingStage and not require that the pipeline be split", + distributedPlanLogic->needsSplit); + + if (distributedPlanLogic->shardsStage) shardPipe->push_back(std::move(distributedPlanLogic->shardsStage)); if (distributedPlanLogic->mergingStage) mergePipe->addInitialSource(std::move(distributedPlanLogic->mergingStage)); - return distributedPlanLogic->inputSortPattern; + /** + * The sort that was earlier in the pipeline takes precedence. + */ + return mergeSort ? mergeSort : distributedPlanLogic->mergeSortPattern; } - return boost::none; + return mergeSort; } /** diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index 661b9954195..072789dd757 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -331,6 +331,10 @@ struct StageConstraints { // Indicates that a stage is allowed within a pipeline-stlye update. bool isAllowedWithinUpdatePipeline = false; + // Indicates that a stage does not modify anything to do with a sort and can be done before a + // following merge sort. + bool preservesOrderAndMetadata = false; + bool operator==(const StageConstraints& other) const { return requiredPosition == other.requiredPosition && hostRequirement == other.hostRequirement && diskRequirement == other.diskRequirement && @@ -343,7 +347,8 @@ struct StageConstraints { canSwapWithMatch == other.canSwapWithMatch && canSwapWithSkippingOrLimitingStage == other.canSwapWithSkippingOrLimitingStage && isAllowedWithinUpdatePipeline == other.isAllowedWithinUpdatePipeline && - unionRequirement == other.unionRequirement; + unionRequirement == other.unionRequirement && + preservesOrderAndMetadata == other.preservesOrderAndMetadata; } }; } // namespace mongo |