From 0e967d6ed33cdf9eb8314e6ae3fb3e2261e213d7 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Thu, 18 Mar 2021 20:39:55 +0000 Subject: SERVER-53638 Enable pushdown of config.cache.chunks $lookup through $sort Co-authored-by: Yuhong Zhang --- .../sharding/query/lookup_pushdown_through_sort.js | 75 +++++ src/mongo/db/matcher/rewrite_expr.cpp | 2 +- src/mongo/db/pipeline/document_source_group.cpp | 2 +- .../pipeline/document_source_set_window_fields.cpp | 2 +- src/mongo/db/pipeline/expression.h | 8 +- src/mongo/db/pipeline/pipeline_test.cpp | 38 ++- src/mongo/db/pipeline/semantic_analysis.cpp | 228 +++++++++++++-- src/mongo/db/pipeline/semantic_analysis.h | 26 +- src/mongo/db/pipeline/semantic_analysis_test.cpp | 313 +++++++++++++++++++++ src/mongo/db/pipeline/sharded_agg_helpers.cpp | 38 ++- .../db/query/sbe_stage_builder_expression.cpp | 4 +- 11 files changed, 701 insertions(+), 35 deletions(-) create mode 100644 jstests/sharding/query/lookup_pushdown_through_sort.js diff --git a/jstests/sharding/query/lookup_pushdown_through_sort.js b/jstests/sharding/query/lookup_pushdown_through_sort.js new file mode 100644 index 00000000000..2b33df100de --- /dev/null +++ b/jstests/sharding/query/lookup_pushdown_through_sort.js @@ -0,0 +1,75 @@ +/** + * Tests for resharding collection cloner's aggregation pipeline to ensure that $lookup on + * config.cache.chunks is pushed down to shards to execute as part of the split pipeline. + * + * @tags: [requires_fcv_49] + */ +(function() { +'use strict'; + +// Create a cluster with 2 shards. +const numShards = 2; +const st = new ShardingTest({shards: numShards}); +const db = st.s.getDB(`${jsTest.name()}_db`); + +function assertLookupRunsOnShards(explain) { + assert(explain.hasOwnProperty("splitPipeline"), tojson(explain)); + assert(explain.splitPipeline.hasOwnProperty("shardsPart"), tojson(explain)); + assert.eq( + explain.splitPipeline.shardsPart.filter(stage => stage.hasOwnProperty("$lookup")).length, + 1, + tojson(explain)); + assert(explain.splitPipeline.hasOwnProperty("mergerPart"), tojson(explain)); + assert.eq([], explain.splitPipeline.mergerPart, tojson(explain)); +} + +// Test that the explain's shardsPart section includes $lookup stage when executing the resharding +// collection cloning aggregation pipeline. +(function testLookupPushedDownToShards() { + const coll = db.coll; + coll.drop(); + // Shards the collection into two parts. + st.shardColl(coll, {a: "hashed"}, false, false); + const explain = coll.explain().aggregate([ + {$match: {$expr: {$gte: ['$_id', {$literal: 1}]}}}, + {$sort: {_id: 1}}, + {$replaceWith: {original: '$$ROOT'}}, + {$lookup: { + from: { + db: 'config', + coll: 'cache.chunks.test.system.resharding' + }, + let: {sk: [ + '$original.x', + {$toHashedIndexKey: '$original.y'} + ]}, + pipeline: [ + {$match: {$expr: {$eq: ['$shard', 'shard0']}}}, + {$match: {$expr: {$let: { + vars: { + min: {$map: {input: {$objectToArray: '$_id'}, in: '$$this.v'}}, + max: {$map: {input: {$objectToArray: '$max'}, in: '$$this.v'}} + }, + in: {$and: [ + {$gte: ['$$sk', '$$min']}, + {$cond: { + if: {$allElementsTrue: [{$map: { + input: '$$max', + in: {$eq: [{$type: '$$this'}, 'maxKey']} + }}]}, + then: {$lte: ['$$sk', '$$max']}, + else: {$lt : ['$$sk', '$$max']} + }} + ]} + }}}} + ], + as: 'intersectingChunk' + }}, + {$match: {intersectingChunk: {$ne: []}}}, + {$replaceWith: '$original'} + ]); + assertLookupRunsOnShards(explain); +})(); + +st.stop(); +})(); diff --git a/src/mongo/db/matcher/rewrite_expr.cpp b/src/mongo/db/matcher/rewrite_expr.cpp index 660073d5a8a..a523853a639 100644 --- a/src/mongo/db/matcher/rewrite_expr.cpp +++ b/src/mongo/db/matcher/rewrite_expr.cpp @@ -233,7 +233,7 @@ bool RewriteExpr::_canRewriteComparison( for (auto operand : operandList) { if (auto exprFieldPath = dynamic_cast(operand.get())) { - if (!exprFieldPath->isRootFieldPath()) { + if (exprFieldPath->isVariableReference()) { // This field path refers to a variable rather than a local document field path. return false; } diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 51c7ccd3fdc..95653b3c21f 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -840,7 +840,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { } auto fieldPathExpr = dynamic_cast(_idExpressions.front().get()); - if (!fieldPathExpr || !fieldPathExpr->isRootFieldPath()) { + if (!fieldPathExpr || fieldPathExpr->isVariableReference()) { return nullptr; } diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp index c201b7da00b..c87f6732c54 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp @@ -163,7 +163,7 @@ list> document_source_set_window_fields::create( if (dynamic_cast(partitionBy->get())) { // partitionBy optimizes to a constant expression, equivalent to a single partition. } else if (auto exprFieldPath = dynamic_cast(partitionBy->get()); - exprFieldPath && exprFieldPath->isRootFieldPath()) { + exprFieldPath && !exprFieldPath->isVariableReference()) { // ExpressionFieldPath has "CURRENT" as an explicit first component, // but for $sort we don't want that. simplePartitionBy = exprFieldPath->getFieldPath().tail(); diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h index b9cc6dfe9b8..813b9caf91c 100644 --- a/src/mongo/db/pipeline/expression.h +++ b/src/mongo/db/pipeline/expression.h @@ -1485,7 +1485,11 @@ public: class ExpressionFieldPath final : public Expression { public: bool isRootFieldPath() const { - return _variable == Variables::kRootId; + return _variable == Variables::kRootId && _fieldPath.getPathLength() == 1; + } + + bool isVariableReference() const { + return Variables::isUserDefinedVariable(_variable); } boost::intrusive_ptr optimize() final; @@ -1988,7 +1992,7 @@ public: * Multiplies two values together as if by evaluate() on * {$multiply: [{$const: lhs}, {$const: rhs}]}. * - * Note that evaluate() does not use apply() directly, because when $muliply takes more than + * Note that evaluate() does not use apply() directly, because when $multiply takes more than * two arguments, it uses a wider intermediate state than Value. * * Returns BSONNULL if either argument is nullish. diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 6d9ba462d35..8f9d9d14ce9 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2564,6 +2564,39 @@ class LookupWithLetWithDBAndColl : public Base { } }; +class CollectionCloningPipeline : public Base { + string inputPipeJson() { + return "[{$match: {$expr: {$gte: ['$_id', {$literal: 1}]}}}" + ",{$sort: {_id: 1}}" + ",{$replaceWith: {original: '$$ROOT'}}" + ",{$lookup: {from: {db: 'config', coll: 'cache.chunks.test'}," + "pipeline: [], as: 'intersectingChunk'}}" + ",{$match: {intersectingChunk: {$ne: []}}}" + ",{$replaceWith: '$original'}" + "]"; + } + + string shardPipeJson() { + return "[{$match: {$and: [{_id: {$_internalExprGte: 1}}, {$expr: {$gte: ['$_id', " + "{$const: 1}]}}]}}" + ", {$sort: {sortKey: {_id: 1}}}" + ", {$replaceRoot: {newRoot: {original: '$$ROOT'}}}" + ", {$lookup: {from: {db: 'config', coll: 'cache.chunks.test'}, as: " + "'intersectingChunk', let: {}, pipeline: []}}" + ", {$match: {intersectingChunk: {$not: {$eq: []}}}}" + ", {$replaceRoot: {newRoot: '$original'}}" + "]"; + } + + string mergePipeJson() { + return "[]"; + } + + NamespaceString getLookupCollNs() override { + return {"config", "cache.chunks.test"}; + } +}; + } // namespace lookupFromShardsInParallel namespace moveFinalUnwindFromShardsToMerger { @@ -2691,7 +2724,7 @@ class MatchWithSkipAddFieldsAndLimit : public Base { /** * The addition of a $group stage between the $skip and $limit stages _does_ prevent us from * propagating the limit to the shards. The merger will need to see all the documents from each - * shard before it can aply the $limit. + * shard before it can apply the $limit. */ class MatchWithSkipGroupAndLimit : public Base { string inputPipeJson() { @@ -3172,7 +3205,7 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest, } /** - * For the purpsoses of this test, assume every collection is unsharded. Stages may ask this during + * For the purposes of this test, assume every collection is unsharded. Stages may ask this during * setup. For example, to compute its constraints, the $merge stage needs to know if the output * collection is sharded. */ @@ -4208,6 +4241,7 @@ public: add(); add(); add(); + add(); add(); add(); add(); diff --git a/src/mongo/db/pipeline/semantic_analysis.cpp b/src/mongo/db/pipeline/semantic_analysis.cpp index e44b0af957a..36f1985a913 100644 --- a/src/mongo/db/pipeline/semantic_analysis.cpp +++ b/src/mongo/db/pipeline/semantic_analysis.cpp @@ -30,6 +30,8 @@ #include "mongo/platform/basic.h" #include "mongo/db/matcher/expression_algo.h" +#include "mongo/db/pipeline/document_source_replace_root.h" +#include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/semantic_analysis.h" @@ -98,47 +100,214 @@ StringMap invertRenameMap(const StringMap& originalMap return reversedMap; } +const ReplaceRootTransformation* isReplaceRoot(const DocumentSource* source) { + // We have to use getSourceName() since DocumentSourceReplaceRoot is never materialized - it + // uses DocumentSourceSingleDocumentTransformation. + auto singleDocTransform = + dynamic_cast(source); + if (!singleDocTransform) { + return nullptr; + } + return dynamic_cast(&singleDocTransform->getTransformer()); +} + +/** + * Detects if 'replaceRootTransform' represents the nesting of a field path. If it does, returns + * the name of that field path. For example, if 'replaceRootTransform' represents the transformation + * associated with {$replaceWith: {nested: "$$ROOT"}} or {$replaceRoot: {newRoot: {nested: + * "$$ROOT"}}}, returns "nested". + */ +boost::optional replaceRootNestsRoot( + const ReplaceRootTransformation* replaceRootTransform) { + auto expressionObject = + dynamic_cast(replaceRootTransform->getExpression().get()); + if (!expressionObject) { + return boost::none; + } + auto children = expressionObject->getChildExpressions(); + if (children.size() != 1u) { + return boost::none; + } + auto&& [nestedName, expression] = children[0]; + if (!dynamic_cast(expression.get()) || + !dynamic_cast(expression.get())->isRootFieldPath()) { + return boost::none; + } + return nestedName; +} + +/** + * Detects if 'replaceRootTransform' represents the unnesting of a field path. If it does, returns + * the name of that field path. For example, if 'replaceRootTransform' represents the transformation + * associated with {$replaceWith: "$x"} or {$replaceRoot: {newRoot: "$x"}}, returns "x". + */ +boost::optional replaceRootUnnestsPath( + const ReplaceRootTransformation* replaceRootTransform) { + auto expressionFieldPath = + dynamic_cast(replaceRootTransform->getExpression().get()); + if (!expressionFieldPath) { + return boost::none; + } + return expressionFieldPath->getFieldPathWithoutCurrentPrefix().fullPath(); +} + /** - * Computes and returns a rename mapping for 'pathsOfInterest' over multiple aggregation pipeline - * stages. The range of pipeline stages we compute renames over is represented by the iterators - * 'start' and 'end'. If both 'start' and 'end' are reverse iterators, then 'start' should come - * after 'end' in the pipeline, 'traversalDir' should be "kBackward," 'pathsOfInterest' should be - * valid path names after stage 'start,' and this template will compute a mapping from the given - * names of 'pathsOfInterest' to their names as they were directly after stage 'end.'If both 'start' - * and 'end' are forwards iterators, then 'start' should come before 'end' in the pipeline, - * 'traversalDir' should be "kForward," 'pathsOfInterest' should be valid path names before stage - * 'start,' and this template will compute a mapping from the given names of 'pathsOfInterest' to - * their names as they are directly before stage 'end.' + * Looks for a pattern where the user temporarily nests the whole object, does some computation, + * then unnests the object. Like so: + * [{$replaceWith: {nested: "$$ROOT"}}, ..., {$replaceWith: "$nested"}]. + * + * If this pattern is detected, returns an iterator to the 'second' replace root, whichever is later + * according to the traversal order. + */ +template +boost::optional lookForNestUnnestPattern( + Iterator start, + Iterator end, + std::set pathsOfInterest, + const Direction& traversalDir, + boost::optional> additionalStageValidatorCallback) { + auto replaceRootTransform = isReplaceRoot((*start).get()); + if (!replaceRootTransform) { + return boost::none; + } + + auto targetName = traversalDir == Direction::kForward + ? replaceRootNestsRoot(replaceRootTransform) + : replaceRootUnnestsPath(replaceRootTransform); + if (!targetName || targetName->find(".") != std::string::npos) { + // Bail out early on dotted paths - we don't intend to deal with that complexity here, + // though we could in the future. + return boost::none; + } + auto nameTestCallback = + traversalDir == Direction::kForward ? replaceRootUnnestsPath : replaceRootNestsRoot; + + ++start; // Advance one to go past the first $replaceRoot we just looked at. + for (; start != end; ++start) { + replaceRootTransform = isReplaceRoot((*start).get()); + if (!replaceRootTransform) { + if (additionalStageValidatorCallback && + !((*additionalStageValidatorCallback)((*start).get()))) { + // There was an additional condition which failed - bail out. + return boost::none; + } + + auto renames = renamedPaths({*targetName}, **start, traversalDir); + if (!renames || + (renames->find(*targetName) != renames->end() && + (*renames)[*targetName] != *targetName)) { + // This stage is not a $replaceRoot - and it modifies our nested path + // ('targetName') somehow. + return boost::none; + } + // This is not a $replaceRoot - but it doesn't impact the nested path, so we continue + // searching for the unnester. + continue; + } + if (auto nestName = nameTestCallback(replaceRootTransform); + nestName && *nestName == *targetName) { + if (additionalStageValidatorCallback && + !((*additionalStageValidatorCallback)((*start).get()))) { + // There was an additional condition which failed - bail out. + return boost::none; + } + return start; + } else { + // If we have a replaceRoot which is not the one we're looking for - then it modifies + // the path we're trying to preserve. As a future enhancement, we maybe could recurse + // here. + return boost::none; + } + } + return boost::none; +} + +/** + * Computes and returns a rename mapping for 'pathsOfInterest' over multiple aggregation + * pipeline stages. The range of pipeline stages we consider renames over is represented by the + * iterators 'start' and 'end'. + * + * If both 'start' and 'end' are reverse iterators, then 'start' should come after 'end' in the + * pipeline, and 'traversalDir' should be "kBackward," 'pathsOfInterest' should be valid path names + * after stage 'start.' + * + * If both 'start' and 'end' are forwards iterators, then 'start' should come before 'end' in the + * pipeline, 'traversalDir' should be "kForward," and 'pathsOfInterest' should be valid path names + * before stage 'start.' + * + * This function will compute an iterator pointing to the "last" stage (farthest in the given + * direction, not included) which preserves 'pathsOfInterest' allowing renames, and returns that + * iterator and a mapping from the given names of 'pathsOfInterest' to their names as they were + * directly "before" (just previous to, according to the direction) the result iterator. If all + * stages preserve the paths of interest, returns 'end.' + * + * An optional 'additionalStageValidatorCallback' function can be provided to short-circuit this + * process and return an iterator to the first stage which either (a) does not preserve + * 'pathsOfInterest,' as before, or (b) does not meet this callback function's criteria. * * This should only be used internally; callers who need to track path renames through an - * aggregation pipeline should use one of the publically exposed options availible in the header. + * aggregation pipeline should use one of the publically exposed options available in the header. */ template -boost::optional> multiStageRenamedPaths( +std::pair> multiStageRenamedPaths( Iterator start, Iterator end, std::set pathsOfInterest, - const Direction& traversalDir) { - // The keys to this map will always be the original names of 'pathsOfInterest'. The values will - // be updated as we loop through the pipeline's stages to always be the most up-to-date name we - // know of for that path. + const Direction& traversalDir, + boost::optional> additionalStageValidatorCallback = + boost::none) { + // The keys to this map will always be the original names of 'pathsOfInterest'. The values + // will be updated as we loop through the pipeline's stages to always be the most up-to-date + // name we know of for that path. StringMap renameMap; for (auto&& path : pathsOfInterest) { renameMap[path] = path; } for (; start != end; ++start) { + if (additionalStageValidatorCallback && + !((*additionalStageValidatorCallback)((*start).get()))) { + // There was an additional condition which failed - bail out. + return {start, renameMap}; + } + auto renamed = renamedPaths(pathsOfInterest, **start, traversalDir); if (!renamed) { - return boost::none; + if (auto finalReplaceRoot = lookForNestUnnestPattern( + start, end, pathsOfInterest, traversalDir, additionalStageValidatorCallback)) { + // We've just detected a pattern where the user temporarily nests the whole + // object, does some computation, then unnests the object. Like so: + // [{$replaceWith: {nested: "$$ROOT"}}, ..., {$replaceWith: "$nested"}]. + // This analysis makes sure that the middle stages don't modify 'nested' or + // whatever the nesting field path is and the additional callback function's + // criteria is met. In this case, we can safely skip over all intervening stages and + // continue on our way. + start = *finalReplaceRoot; + continue; + } + return {start, renameMap}; } - //'pathsOfInterest' always holds the current names of the paths we're interested in, so it - // needs to be updated after each stage. + //'pathsOfInterest' always holds the current names of the paths we're interested in, so + // it needs to be updated after each stage. pathsOfInterest.clear(); for (auto it = renameMap.cbegin(); it != renameMap.cend(); ++it) { renameMap[it->first] = (*renamed)[it->second]; pathsOfInterest.emplace(it->second); } } + return {end, renameMap}; +} +template +boost::optional> renamedPathsFullPipeline( + Iterator start, + Iterator end, + std::set pathsOfInterest, + const Direction& traversalDir, + boost::optional> additionalStageValidatorCallback) { + auto [itr, renameMap] = multiStageRenamedPaths( + start, end, pathsOfInterest, traversalDir, additionalStageValidatorCallback); + if (itr != end) { + return boost::none; // The paths were not preserved to the very end. + } return renameMap; } @@ -232,15 +401,28 @@ boost::optional> renamedPaths(const std::set boost::optional> renamedPaths( const Pipeline::SourceContainer::const_iterator start, const Pipeline::SourceContainer::const_iterator end, - const std::set& pathsOfInterest) { - return multiStageRenamedPaths(start, end, pathsOfInterest, Direction::kForward); + const std::set& pathsOfInterest, + boost::optional> additionalStageValidatorCallback) { + return renamedPathsFullPipeline( + start, end, pathsOfInterest, Direction::kForward, additionalStageValidatorCallback); } boost::optional> renamedPaths( const Pipeline::SourceContainer::const_reverse_iterator start, const Pipeline::SourceContainer::const_reverse_iterator end, - const std::set& pathsOfInterest) { - return multiStageRenamedPaths(start, end, pathsOfInterest, Direction::kBackward); + const std::set& pathsOfInterest, + boost::optional> additionalStageValidatorCallback) { + return renamedPathsFullPipeline( + start, end, pathsOfInterest, Direction::kBackward, additionalStageValidatorCallback); } +std::pair> +findLongestViablePrefixPreservingPaths( + const Pipeline::SourceContainer::const_iterator start, + const Pipeline::SourceContainer::const_iterator end, + const std::set& pathsOfInterest, + boost::optional> additionalStageValidatorCallback) { + return multiStageRenamedPaths( + start, end, pathsOfInterest, Direction::kForward, additionalStageValidatorCallback); +} } // namespace mongo::semantic_analysis diff --git a/src/mongo/db/pipeline/semantic_analysis.h b/src/mongo/db/pipeline/semantic_analysis.h index 73739919e10..f7c0b3f56b5 100644 --- a/src/mongo/db/pipeline/semantic_analysis.h +++ b/src/mongo/db/pipeline/semantic_analysis.h @@ -72,7 +72,9 @@ boost::optional> renamedPaths(const std::set boost::optional> renamedPaths( const Pipeline::SourceContainer::const_iterator start, const Pipeline::SourceContainer::const_iterator end, - const std::set& pathsOfInterest); + const std::set& pathsOfInterest, + boost::optional> additionalStageValidatorCallback = + boost::none); /** * Tracks renames by walking a pipeline backwards. Takes two reverse iterators that represent two @@ -87,7 +89,24 @@ boost::optional> renamedPaths( boost::optional> renamedPaths( const Pipeline::SourceContainer::const_reverse_iterator start, const Pipeline::SourceContainer::const_reverse_iterator end, - const std::set& pathsOfInterest); + const std::set& pathsOfInterest, + boost::optional> additionalStageValidatorCallback = + boost::none); + +/** + * Attempts to find a maximal prefix of the pipeline given by 'start' and 'end' which will preserve + * all paths in 'pathsOfInterest' and also have each DocumentSource satisfy + * 'additionalStageValidatorCallback'. + * + * Returns an iterator to the first stage which modifies one of the paths in 'pathsOfInterest' or + * fails 'additionalStageValidatorCallback', or returns 'end' if no such stage exists. + */ +std::pair> +findLongestViablePrefixPreservingPaths(const Pipeline::SourceContainer::const_iterator start, + const Pipeline::SourceContainer::const_iterator end, + const std::set& pathsOfInterest, + boost::optional> + additionalStageValidatorCallback = boost::none); /** * Given a set of paths 'dependencies', determines which of those paths will be modified if all @@ -99,4 +118,7 @@ boost::optional> renamedPaths( std::set extractModifiedDependencies(const std::set& dependencies, const std::set& preservedPaths); +bool pathSetContainsOverlappingPath(const std::set& paths, + const std::string& targetPath); + } // namespace mongo::semantic_analysis diff --git a/src/mongo/db/pipeline/semantic_analysis_test.cpp b/src/mongo/db/pipeline/semantic_analysis_test.cpp index a2fcf808327..ca95d9098a2 100644 --- a/src/mongo/db/pipeline/semantic_analysis_test.cpp +++ b/src/mongo/db/pipeline/semantic_analysis_test.cpp @@ -439,5 +439,318 @@ TEST_F(SemanticAnalysisRenamedPaths, ReturnsNoneWhenModificationsAreNotKnown) { } } +TEST_F(SemanticAnalysisRenamedPaths, DetectsSimpleReplaceRootPattern) { + auto pipeline = Pipeline::parse( + {fromjson("{$replaceWith: {nested: '$$ROOT'}}"), fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"b"}); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_TRUE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, DetectsReplaceRootPatternAllowsIntermediateStages) { + auto pipeline = + Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"), + fromjson("{$match: {bigEnough: true}}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"b"}); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_TRUE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, AdditionalStageValidatorCallbackPassed) { + auto pipeline = + Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"), + fromjson("{$match: {bigEnough: true}}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + std::function callback = [](DocumentSource* stage) { + return !static_cast(stage->distributedPlanLogic()); + }; + { + auto renames = renamedPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}, callback); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = renamedPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"b"}, callback); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = renamedPaths( + pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}, callback); + ASSERT_TRUE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, AdditionalStageValidatorCallbackNotPassed) { + auto pipeline = + Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"), + fromjson("{$match: {bigEnough: true}}"), + fromjson("{$sort: {x: 1}}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_TRUE(static_cast(renames)); + } + std::function callback = [](DocumentSource* stage) { + return !static_cast(stage->distributedPlanLogic()); + }; + { + auto renames = renamedPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}, callback); + ASSERT_FALSE(static_cast(renames)); + } + { + auto renames = renamedPaths( + pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}, callback); + ASSERT_FALSE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, DetectsReplaceRootPatternDisallowsIntermediateModification) { + auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set: {'nested.field': 'anyNewValue'}}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_FALSE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, DoesNotDetectFalseReplaceRootIfTypoed) { + auto pipeline = Pipeline::parse( + {fromjson("{$replaceWith: {nested: '$$ROOT'}}"), fromjson("{$replaceWith: '$nestedTypo'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_FALSE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, DetectsReplaceRootPatternIfCurrentInsteadOfROOT) { + auto pipeline = Pipeline::parse( + {fromjson("{$replaceWith: {nested: '$$CURRENT'}}"), fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_TRUE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_TRUE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, DoesNotDetectFalseReplaceRootIfNoROOT) { + auto pipeline = Pipeline::parse( + {fromjson("{$replaceWith: {nested: '$subObj'}}"), fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + { + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_FALSE(static_cast(renames)); + } + { + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } +} + +TEST_F(SemanticAnalysisRenamedPaths, DoesNotDetectFalseReplaceRootIfTargetPathIsRenamed) { + + { + auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$unset : 'nested'}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + auto renames = + renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT_FALSE(static_cast(renames)); + } + { + auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set : {nested: '$somethingElese'}}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } + { + // This case could someday work - we leave it as a future improvement. + auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set : {somethingElse: '$nested'}}"), + fromjson("{$replaceWith: '$somethingElse'}")}, + getExpCtx()); + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } + { + // This is a tricky one. The pattern does exist, but it's doubly nested and only unnested + // once. + auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$replaceWith: {doubleNested: '$nested'}}"), + fromjson("{$replaceWith: '$doubleNested'}")}, + getExpCtx()); + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } + { + // Similar to above but double nested then double unnested. We could someday make this work, + // but leave it for a future improvement. + auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$replaceWith: {doubleNested: '$nested'}}"), + fromjson("{$replaceWith: '$doubleNested'}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + auto renames = + renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}); + ASSERT_FALSE(static_cast(renames)); + } +} + +using SemanticAnalysisFindLongestViablePrefix = AggregationContextFixture; +TEST_F(SemanticAnalysisFindLongestViablePrefix, AllowsReplaceRootPattern) { + auto pipeline = + Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"), + fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"), + fromjson("{$match: {bigEnough: true}}"), + fromjson("{$replaceWith: '$nested'}")}, + getExpCtx()); + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT(itr == pipeline->getSources().end()); +} + +TEST_F(SemanticAnalysisFindLongestViablePrefix, FindsPrefixWithoutReplaceRoot) { + auto pipeline = Pipeline::parse({fromjson("{$match: {testing: true}}"), + fromjson("{$unset: 'unset'}"), + fromjson("{$set: {x: '$y'}}")}, + getExpCtx()); + { + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}); + ASSERT(itr == pipeline->getSources().end()); + } + { + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"unset"}); + ASSERT(itr == std::next(pipeline->getSources().begin())); + } + { + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"y"}); + ASSERT(itr == pipeline->getSources().end()); + ASSERT(renames["y"] == "x"); + } + { + // TODO (SERVER-55815): "x" should be considered modified in the $set stage. + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"x"}); + ASSERT(itr == pipeline->getSources().end()); + ASSERT(renames["x"] == "x"); + } +} + +TEST_F(SemanticAnalysisFindLongestViablePrefix, FindsLastPossibleStageWithCallback) { + auto pipeline = Pipeline::parse({fromjson("{$match: {testing: true}}"), + fromjson("{$unset: 'unset'}"), + fromjson("{$sort: {y: 1}}"), + fromjson("{$set: {x: '$y'}}")}, + getExpCtx()); + { + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"y"}); + ASSERT(itr == pipeline->getSources().end()); + ASSERT(renames["y"] == "x"); + } + std::function callback = [](DocumentSource* stage) { + return !static_cast(stage->distributedPlanLogic()); + }; + { + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"y"}, callback); + ASSERT(itr == std::prev(std::prev(pipeline->getSources().end()))); + ASSERT(renames["y"] == "y"); + } +} + +TEST_F(SemanticAnalysisFindLongestViablePrefix, CorrectlyAnswersReshardingUseCase) { + auto expCtx = getExpCtx(); + auto lookupNss = NamespaceString{"config.cache.chunks.test"}; + expCtx->setResolvedNamespace(lookupNss, ExpressionContext::ResolvedNamespace{lookupNss, {}}); + auto pipeline = + Pipeline::parse({fromjson("{$replaceWith: {original: '$$ROOT'}}"), + fromjson("{$lookup: {from: {db: 'config', coll: 'cache.chunks.test'}, " + "pipeline: [], as: 'intersectingChunk'}}"), + fromjson("{$match: {intersectingChunk: {$ne: []}}}"), + fromjson("{$replaceWith: '$original'}")}, + getExpCtx()); + std::function callback = [](DocumentSource* stage) { + return !static_cast(stage->distributedPlanLogic()); + }; + { + auto [itr, renames] = findLongestViablePrefixPreservingPaths( + pipeline->getSources().begin(), pipeline->getSources().end(), {"_id"}, callback); + ASSERT(itr == pipeline->getSources().end()); + ASSERT(renames["_id"] == "_id"); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index a3296e95f45..678d5b24caf 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -298,6 +298,38 @@ void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) } } +/** + * When the last stage of shard pipeline is $sort, move stages that can run on shards and don't + * rename or modify the fields in $sort from merge pipeline. The function starts from the beginning + * of the merge pipeline and finds the first consecutive eligible stages. + */ +void moveEligibleStreamingStagesBeforeSortOnShards(Pipeline* shardPipe, + Pipeline* mergePipe, + const BSONObj& sortPattern) { + tassert(5363800, + "Expected non-empty shardPipe consisting of at least a $sort stage", + !shardPipe->getSources().empty()); + if (!dynamic_cast(shardPipe->getSources().back().get())) { + // Expected last stage on the shards to be a $sort. + return; + } + auto sortPaths = sortPattern.getFieldNames>(); + auto firstMergeStage = mergePipe->getSources().cbegin(); + std::function distributedPlanLogicCallback = [](DocumentSource* stage) { + return !static_cast(stage->distributedPlanLogic()); + }; + auto [lastUnmodified, renameMap] = semantic_analysis::findLongestViablePrefixPreservingPaths( + firstMergeStage, mergePipe->getSources().cend(), sortPaths, distributedPlanLogicCallback); + for (const auto& sortPath : sortPaths) { + auto pair = renameMap.find(sortPath); + if (pair == renameMap.end() || pair->first != pair->second) { + return; + } + } + shardPipe->getSources().insert(shardPipe->getSources().end(), firstMergeStage, lastUnmodified); + mergePipe->getSources().erase(firstMergeStage, lastUnmodified); +} + /** * Returns true if the final stage of the pipeline limits the number of documents it could output * (such as a $limit stage). @@ -775,6 +807,10 @@ SplitPipeline splitPipeline(std::unique_ptr pipeline) // The order in which optimizations are applied can have significant impact on the efficiency of // the final pipeline. Be Careful! + if (inputsSort) { + moveEligibleStreamingStagesBeforeSortOnShards( + shardsPipeline.get(), mergePipeline.get(), *inputsSort); + } moveFinalUnwindFromShardsToMerger(shardsPipeline.get(), mergePipeline.get()); propagateDocLimitToShards(shardsPipeline.get(), mergePipeline.get()); limitFieldsSentFromShardsToMerger(shardsPipeline.get(), mergePipeline.get()); @@ -857,7 +893,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // pipeline; if not, we retain the existing pipeline. // - Call establishShardCursors to dispatch the aggregation to the targeted shards. // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the - // entire aggregation commmand. + // entire aggregation command. auto cursors = std::vector(); auto shardResults = std::vector(); auto opCtx = expCtx->opCtx; diff --git a/src/mongo/db/query/sbe_stage_builder_expression.cpp b/src/mongo/db/query/sbe_stage_builder_expression.cpp index 5c68c224b85..6353d5e31a1 100644 --- a/src/mongo/db/query/sbe_stage_builder_expression.cpp +++ b/src/mongo/db/query/sbe_stage_builder_expression.cpp @@ -1173,7 +1173,7 @@ public: collatorSlot, _context->planNodeId); - // Create a branch stage to select between the branch that produces one null if any eleemnts + // Create a branch stage to select between the branch that produces one null if any elements // in the original input were null or missing, or otherwise select the branch that unwinds // and concatenates elements into the output array. auto [nullExpr, nullStage] = makeNullLimitCoscanTree(); @@ -1790,7 +1790,7 @@ public: } sbe::value::SlotId slotId; - if (expr->isRootFieldPath()) { + if (!expr->isVariableReference()) { slotId = _context->rootSlot; } else { auto it = _context->environment.find(expr->getVariableId()); -- cgit v1.2.1