summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2022-03-28 12:21:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-28 12:45:22 +0000
commitb977129dc70eed766cbee7e412d901ee213acbda (patch)
treeb2c54bf7b50d34ce799362e6c96fb9a560255eb5
parent94d982e0a8615b705a653e0ac6eba88e1bf3c3f0 (diff)
downloadmongo-r5.0.7.tar.gz
SERVER-64410 Add ability to defer merge sort in pipeliner5.0.7-rc1r5.0.7
-rw-r--r--src/mongo/db/pipeline/document_source.h10
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp134
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp33
-rw-r--r--src/mongo/db/pipeline/stage_constraints.h7
7 files changed, 182 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index d163c88761e..5117fd9c050 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -267,9 +267,15 @@ public:
boost::intrusive_ptr<DocumentSource> mergingStage = nullptr;
// If set, each document is expected to have sort key metadata which will be serialized in
- // the '$sortKey' field. 'inputSortPattern' will then be used to describe which fields are
+ // the '$sortKey' field. 'mergeSortPattern' will then be used to describe which fields are
// ascending and which fields are descending when merging the streams together.
- boost::optional<BSONObj> inputSortPattern = boost::none;
+ boost::optional<BSONObj> mergeSortPattern = boost::none;
+
+ // If mergeSortPattern is specified and needsSplit is false, the split point will be
+ // deferred to the next stage that would split the pipeline. The sortPattern will be taken
+ // into account at that split point. Should be true if a stage specifies 'shardsStage' or
+ // 'mergingStage'. Does not mean anything if the sort pattern is not set.
+ bool needsSplit = true;
};
virtual ~DocumentSource() {}
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index de6252449eb..e9e620d192a 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -604,7 +604,7 @@ protected:
ASSERT(distributedPlanLogic);
ASSERT(distributedPlanLogic->mergingStage);
ASSERT_NOT_EQUALS(group(), distributedPlanLogic->mergingStage);
- ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->inputSortPattern));
+ ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->mergeSortPattern));
return distributedPlanLogic->mergingStage;
}
void checkResultSet(const intrusive_ptr<DocumentSource>& sink) {
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 6f3f95d16f1..1b5cb79ec7f 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -136,7 +136,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::dist
// just used to communicate ascending/descending information. A pattern like {$meta: "randVal"}
// is neither ascending nor descending, and so will not be useful when constructing the merging
// logic.
- logic.inputSortPattern = BSON("$rand" << -1);
+ logic.mergeSortPattern = BSON("$rand" << -1);
return logic;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index efbc3f3fe42..044d5340578 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -254,7 +254,7 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() {
DistributedPlanLogic split;
split.shardsStage = this;
- split.inputSortPattern = _sortExecutor->sortPattern()
+ split.mergeSortPattern = _sortExecutor->sortPattern()
.serialize(SortPattern::SortKeySerialization::kForSortKeyMerging)
.toBson();
if (auto limit = getLimit()) {
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 75807b0232f..9b807f7fd23 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -3286,6 +3286,140 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineRunsUnsplitOnMongoSIfSplit
}
} // namespace mustRunOnMongoS
+
+namespace DeferredSort {
+
+// Like a DocumentSourceMock, but has a deferrable merge sort.
+class DocumentSourceDeferredMergeSort : public DocumentSourceMock {
+public:
+ DocumentSourceDeferredMergeSort(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {}
+
+ static boost::intrusive_ptr<DocumentSourceDeferredMergeSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceDeferredMergeSort(expCtx);
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ DistributedPlanLogic logic;
+
+ logic.mergingStage = nullptr;
+ logic.shardsStage = nullptr;
+ logic.mergeSortPattern = BSON("a" << 1);
+ logic.needsSplit = false;
+
+ return logic;
+ }
+};
+
+class DocumentSourceCanSwapWithSort : public DocumentSourceMock {
+public:
+ DocumentSourceCanSwapWithSort(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {}
+
+ static boost::intrusive_ptr<DocumentSourceCanSwapWithSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceCanSwapWithSort(expCtx);
+ }
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ return boost::none;
+ }
+
+ // This is just to test splitting logic, doGetNext should not be called.
+ GetNextResult doGetNext() override {
+ MONGO_UNREACHABLE;
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kAllowed,
+ UnionRequirement::kAllowed,
+ ChangeStreamRequirement::kDenylist);
+ constraints.preservesOrderAndMetadata = true;
+
+ return constraints;
+ }
+};
+
+using PipelineDeferredMergeSortTest = AggregationContextFixture;
+using HostTypeRequirement = StageConstraints::HostTypeRequirement;
+
+TEST_F(PipelineDeferredMergeSortTest, StageWithDeferredSortDoesNotSplit) {
+ auto expCtx = getExpCtx();
+
+ expCtx->inMongos = true;
+
+ auto mock = DocumentSourceDeferredMergeSort::create(expCtx);
+ auto swappable = DocumentSourceCanSwapWithSort::create(expCtx);
+ auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
+ auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx);
+
+ auto pipeline = Pipeline::create({mock, swappable, split, runOnMongoS}, expCtx);
+
+ auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Verify that we've split the pipeline at the SplitPipeline stage, not on the deferred sort.
+ ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2);
+ ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 2);
+
+ // Verify the sort is correct.
+ ASSERT(splitPipeline.shardCursorsSortSpec);
+ ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1));
+}
+
+TEST_F(PipelineDeferredMergeSortTest, EarliestSortIsSelectedIfDeferred) {
+ auto expCtx = getExpCtx();
+
+ expCtx->inMongos = true;
+
+ auto mock = DocumentSourceDeferredMergeSort::create(expCtx);
+ auto swappable = DocumentSourceCanSwapWithSort::create(expCtx);
+ auto sort = DocumentSourceSort::create(expCtx, fromjson("{NO: 1}"));
+ auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
+ auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx);
+
+ auto pipeline = Pipeline::create({mock, swappable, sort, split, runOnMongoS}, expCtx);
+
+ auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Verify that we've split the pipeline at the non-deferred sort.
+ ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2);
+ ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3);
+
+ // Verify the sort is correct.
+ ASSERT(splitPipeline.shardCursorsSortSpec);
+ ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1));
+}
+
+TEST_F(PipelineDeferredMergeSortTest, StageThatCantSwapGoesToMergingHalf) {
+ auto expCtx = getExpCtx();
+
+ expCtx->inMongos = true;
+
+ auto mock = DocumentSourceDeferredMergeSort::create(expCtx);
+ auto match = DocumentSourceMatch::create(fromjson("{a: 5}"), expCtx);
+ auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
+ auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx);
+
+ auto pipeline = Pipeline::create({mock, match, split, runOnMongoS}, expCtx);
+
+ auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Verify that we've split the pipeline at the stage that can't be swapped.
+ ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 1);
+ ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3);
+
+ // Verify the sort is correct.
+ ASSERT(splitPipeline.shardCursorsSortSpec);
+ ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1));
+}
+} // namespace DeferredSort
} // namespace Sharded
} // namespace Optimizations
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 957cfc6de40..505b1bca450 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -262,9 +262,18 @@ std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expC
* Returns the sort specification if the input streams are sorted, and false otherwise.
*/
boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) {
+ // Stages can specify that a merge sort must be performed sometime during the pipeline. Keep
+ // track of it until we hit the actual split point.
+ boost::optional<BSONObj> mergeSort = boost::none;
while (!mergePipe->getSources().empty()) {
boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront();
+ // If we've deferred a sort, only push it past stages that don't change the sort order.
+ if (mergeSort && !current->constraints().preservesOrderAndMetadata) {
+ // This will break the merge sort, keep it in the merging half of the pipeline.
+ mergePipe->addInitialSource(std::move(current));
+ return mergeSort;
+ }
// Check if this source is splittable.
auto distributedPlanLogic = current->distributedPlanLogic();
if (!distributedPlanLogic) {
@@ -273,18 +282,38 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi
continue;
}
+ // If we got a plan logic with a sort that doesn't require a split, save it and keep going.
+ if (distributedPlanLogic->mergeSortPattern && !distributedPlanLogic->needsSplit) {
+ tassert(6441000,
+ "Cannot specify shardsStage or mergingStage and not require that the pipeline "
+ "be split",
+ !distributedPlanLogic->shardsStage && !distributedPlanLogic->mergingStage);
+ shardPipe->push_back(current);
+ mergeSort = distributedPlanLogic->mergeSortPattern;
+ continue;
+ }
+
// A source may not simultaneously be present on both sides of the split.
invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
+ tassert(
+ 6441001,
+ "Cannot specify shardsStage or mergingStage and not require that the pipeline be split",
+ distributedPlanLogic->needsSplit);
+
+
if (distributedPlanLogic->shardsStage)
shardPipe->push_back(std::move(distributedPlanLogic->shardsStage));
if (distributedPlanLogic->mergingStage)
mergePipe->addInitialSource(std::move(distributedPlanLogic->mergingStage));
- return distributedPlanLogic->inputSortPattern;
+ /**
+ * The sort that was earlier in the pipeline takes precedence.
+ */
+ return mergeSort ? mergeSort : distributedPlanLogic->mergeSortPattern;
}
- return boost::none;
+ return mergeSort;
}
/**
diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h
index 661b9954195..072789dd757 100644
--- a/src/mongo/db/pipeline/stage_constraints.h
+++ b/src/mongo/db/pipeline/stage_constraints.h
@@ -331,6 +331,10 @@ struct StageConstraints {
// Indicates that a stage is allowed within a pipeline-stlye update.
bool isAllowedWithinUpdatePipeline = false;
+ // Indicates that a stage does not modify anything to do with a sort and can be done before a
+ // following merge sort.
+ bool preservesOrderAndMetadata = false;
+
bool operator==(const StageConstraints& other) const {
return requiredPosition == other.requiredPosition &&
hostRequirement == other.hostRequirement && diskRequirement == other.diskRequirement &&
@@ -343,7 +347,8 @@ struct StageConstraints {
canSwapWithMatch == other.canSwapWithMatch &&
canSwapWithSkippingOrLimitingStage == other.canSwapWithSkippingOrLimitingStage &&
isAllowedWithinUpdatePipeline == other.isAllowedWithinUpdatePipeline &&
- unionRequirement == other.unionRequirement;
+ unionRequirement == other.unionRequirement &&
+ preservesOrderAndMetadata == other.preservesOrderAndMetadata;
}
};
} // namespace mongo