From 93641f442e33eb76ae3833dbc894e22326c044e5 Mon Sep 17 00:00:00 2001 From: samontea Date: Tue, 4 May 2021 00:06:58 +0000 Subject: SERVER-54573 Dependency analysis for $setWindowFields stage --- .../pipeline/document_source_set_window_fields.h | 59 +++++++++++++--- .../document_source_set_window_fields_test.cpp | 78 ++++++++++++++++++++++ src/mongo/db/pipeline/document_source_sort.cpp | 9 +-- .../window_function/window_function_expression.h | 6 ++ src/mongo/db/query/sort_pattern.cpp | 10 +++ src/mongo/db/query/sort_pattern.h | 2 + 6 files changed, 149 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h index ca21d04cafa..e0c004bf7fa 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.h +++ b/src/mongo/db/pipeline/document_source_set_window_fields.h @@ -33,6 +33,7 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_set_window_fields_gen.h" +#include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/memory_usage_tracker.h" #include "mongo/db/pipeline/window_function/partition_iterator.h" #include "mongo/db/pipeline/window_function/window_bounds.h" @@ -54,6 +55,21 @@ struct WindowFunctionStatement { static WindowFunctionStatement parse(BSONElement elem, const boost::optional& sortBy, ExpressionContext* expCtx); + + void addDependencies(DepsTracker* deps) const { + if (expr) { + expr->addDependencies(deps); + } + + const FieldPath path(fieldName); + + // We do this because acting on "a.b" where a is an object also depends on "a" not being + // changed (e.g. to a non-object). + for (size_t i = 0; i < path.getPathLength() - 1; i++) { + deps->fields.insert(path.getSubpath(i).toString()); + } + } + void serialize(MutableDocument& outputFields, boost::optional explain) const; }; @@ -99,21 +115,48 @@ public: _iterator(expCtx.get(), pSource, std::move(partitionBy), _sortBy), _memoryTracker{false /* allowDiskUse */, maxMemoryBytes} {}; + GetModPathsReturn getModifiedPaths() const final { + std::set outputPaths; + for (auto&& outputField : _outputFields) { + outputPaths.insert(outputField.fieldName); + } + + return {DocumentSource::GetModPathsReturn::Type::kFiniteSet, std::move(outputPaths), {}}; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { - return StageConstraints(StreamType::kBlocking, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed, - TransactionRequirement::kAllowed, - LookupRequirement::kAllowed, - UnionRequirement::kAllowed); + StageConstraints constraints(StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); + return constraints; } const char* getSourceName() const { return kStageName.rawData(); }; + DepsTracker::State getDependencies(DepsTracker* deps) const final { + if (_sortBy) { + _sortBy->addDependencies(deps); + } + + if (_partitionBy && (*_partitionBy)) { + (*_partitionBy)->addDependencies(deps); + } + + for (auto&& outputField : _outputFields) { + outputField.addDependencies(deps); + } + + return DepsTracker::State::SEE_NEXT; + } + boost::optional distributedPlanLogic() { // Force to run on the merging half for now. return DistributedPlanLogic{nullptr, this, boost::none}; diff --git a/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp b/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp index d7146860429..287497db445 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp @@ -134,5 +134,83 @@ TEST_F(DocumentSourceSetWindowFieldsTest, HandlesEmptyInputCorrectly) { (int)parsedStage->getNext().getStatus()); } +TEST_F(DocumentSourceSetWindowFieldsTest, HandlesDependencyWithArrayExpression) { + auto spec = fromjson(R"( + {$_internalSetWindowFields: {partitionBy: '$a', sortBy: {b: 1}, output: {myCov: + {$covariancePop: ['$c', '$d']}}}})"); + auto parsedStage = + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + DepsTracker deps(DepsTracker::kAllMetadata); + ASSERT_EQUALS(parsedStage->getDependencies(&deps), DepsTracker::State::SEE_NEXT); + ASSERT_EQUALS(deps.fields.size(), 4U); + ASSERT_EQUALS(deps.fields.count("a"), 1U); + ASSERT_EQUALS(deps.fields.count("b"), 1U); + ASSERT_EQUALS(deps.fields.count("c"), 1U); + ASSERT_EQUALS(deps.fields.count("d"), 1U); +} + +TEST_F(DocumentSourceSetWindowFieldsTest, HandlesDependencyWithNoSort) { + auto spec = fromjson(R"( + {$_internalSetWindowFields: {partitionBy: '$a', output: {myAvg: + {$avg: '$c'}}}})"); + auto parsedStage = + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + DepsTracker deps(DepsTracker::kAllMetadata); + ASSERT_EQUALS(parsedStage->getDependencies(&deps), DepsTracker::State::SEE_NEXT); + ASSERT_EQUALS(deps.fields.size(), 2U); + ASSERT_EQUALS(deps.fields.count("a"), 1U); + ASSERT_EQUALS(deps.fields.count("c"), 1U); +} + +TEST_F(DocumentSourceSetWindowFieldsTest, HandlesDependencyWithNoPartitionBy) { + auto spec = fromjson(R"( + {$_internalSetWindowFields: {output: {myAvg: + {$avg: '$c'}}}})"); + auto parsedStage = + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + DepsTracker deps(DepsTracker::kAllMetadata); + ASSERT_EQUALS(parsedStage->getDependencies(&deps), DepsTracker::State::SEE_NEXT); + ASSERT_EQUALS(deps.fields.size(), 1U); + ASSERT_EQUALS(deps.fields.count("c"), 1U); +} + +TEST_F(DocumentSourceSetWindowFieldsTest, HandlesDependencyWithNoInputDependency) { + auto spec = fromjson(R"( + {$_internalSetWindowFields: {output: {myCount: + {$sum: 1}}}})"); + auto parsedStage = + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + DepsTracker deps(DepsTracker::kAllMetadata); + ASSERT_EQUALS(parsedStage->getDependencies(&deps), DepsTracker::State::SEE_NEXT); + ASSERT_EQUALS(deps.fields.size(), 0U); +} + +TEST_F(DocumentSourceSetWindowFieldsTest, HandlesImplicitDependencyForDottedOutputField) { + auto spec = fromjson(R"( + {$_internalSetWindowFields: {output: {'x.y.z': + {$sum: 1}}}})"); + auto parsedStage = + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + DepsTracker deps(DepsTracker::kAllMetadata); + ASSERT_EQUALS(parsedStage->getDependencies(&deps), DepsTracker::State::SEE_NEXT); + ASSERT_EQUALS(deps.fields.size(), 2U); + ASSERT_EQUALS(deps.fields.count("x"), 1U); + ASSERT_EQUALS(deps.fields.count("x.y"), 1U); + ASSERT_EQUALS(deps.fields.count("x.y.z"), 0U); +} + +TEST_F(DocumentSourceSetWindowFieldsTest, ReportsModifiedFields) { + auto spec = fromjson(R"( + {$_internalSetWindowFields: {output: {a: + {$sum: 1}, b: {$sum: 1}}}})"); + auto parsedStage = + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + auto modified = parsedStage->getModifiedPaths(); + ASSERT_TRUE(modified.type == DocumentSource::GetModPathsReturn::Type::kFiniteSet); + ASSERT_EQUALS(modified.paths.size(), 2U); + ASSERT_EQUALS(modified.paths.count("a"), 1U); + ASSERT_EQUALS(modified.paths.count("b"), 1U); + ASSERT_TRUE(modified.renames.empty()); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index f27dc68c0dd..1dc94d03efb 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -172,13 +172,8 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( } DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const { - for (auto&& keyPart : _sortExecutor->sortPattern()) { - if (keyPart.expression) { - keyPart.expression->addDependencies(deps); - } else { - deps->fields.insert(keyPart.fieldPath->fullPath()); - } - } + _sortExecutor->sortPattern().addDependencies(deps); + if (pExpCtx->needsMerge) { // Include the sort key if we will merge several sorted streams later. deps->setNeedsMetadata(DocumentMetadataFields::kSortKey, true); diff --git a/src/mongo/db/pipeline/window_function/window_function_expression.h b/src/mongo/db/pipeline/window_function/window_function_expression.h index c74d274f142..1f1a66a7807 100644 --- a/src/mongo/db/pipeline/window_function/window_function_expression.h +++ b/src/mongo/db/pipeline/window_function/window_function_expression.h @@ -143,6 +143,12 @@ public: virtual std::unique_ptr buildRemovable() const = 0; + void addDependencies(DepsTracker* deps) const { + if (_input) { + _input->addDependencies(deps); + } + }; + virtual Value serialize(boost::optional explain) const { MutableDocument args; diff --git a/src/mongo/db/query/sort_pattern.cpp b/src/mongo/db/query/sort_pattern.cpp index a7fff3f9061..185da1bb804 100644 --- a/src/mongo/db/query/sort_pattern.cpp +++ b/src/mongo/db/query/sort_pattern.cpp @@ -132,4 +132,14 @@ Document SortPattern::serialize(SortKeySerialization serializationMode) const { } return keyObj.freeze(); } + +void SortPattern::addDependencies(DepsTracker* deps) const { + for (auto&& keyPart : _sortPattern) { + if (keyPart.expression) { + keyPart.expression->addDependencies(deps); + } else { + deps->fields.insert(keyPart.fieldPath->fullPath()); + } + } +} } // namespace mongo diff --git a/src/mongo/db/query/sort_pattern.h b/src/mongo/db/query/sort_pattern.h index 2fad0144c7d..6ba87271081 100644 --- a/src/mongo/db/query/sort_pattern.h +++ b/src/mongo/db/query/sort_pattern.h @@ -86,6 +86,8 @@ public: return _sortPattern.empty(); } + void addDependencies(DepsTracker* deps) const; + /** * Singleton sort patterns are a special case. In memory, sort keys for singleton patterns get * stored as a single Value, corresponding to the single component of the sort pattern. By -- cgit v1.2.1