summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2021-03-18 20:39:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-09 15:53:25 +0000
commit0e967d6ed33cdf9eb8314e6ae3fb3e2261e213d7 (patch)
tree9a74cb4ea7fffee339a1b1ecc8505e67b901ac87
parent6a09a2b69726407c07521be98baaf5ba6c86373f (diff)
downloadmongo-0e967d6ed33cdf9eb8314e6ae3fb3e2261e213d7.tar.gz
SERVER-53638 Enable pushdown of config.cache.chunks $lookup through $sort
Co-authored-by: Yuhong Zhang <danielzhangyh@gmail.com>
-rw-r--r--jstests/sharding/query/lookup_pushdown_through_sort.js75
-rw-r--r--src/mongo/db/matcher/rewrite_expr.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.cpp2
-rw-r--r--src/mongo/db/pipeline/expression.h8
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp38
-rw-r--r--src/mongo/db/pipeline/semantic_analysis.cpp228
-rw-r--r--src/mongo/db/pipeline/semantic_analysis.h26
-rw-r--r--src/mongo/db/pipeline/semantic_analysis_test.cpp313
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp38
-rw-r--r--src/mongo/db/query/sbe_stage_builder_expression.cpp4
11 files changed, 701 insertions, 35 deletions
diff --git a/jstests/sharding/query/lookup_pushdown_through_sort.js b/jstests/sharding/query/lookup_pushdown_through_sort.js
new file mode 100644
index 00000000000..2b33df100de
--- /dev/null
+++ b/jstests/sharding/query/lookup_pushdown_through_sort.js
@@ -0,0 +1,75 @@
+/**
+ * Tests for resharding collection cloner's aggregation pipeline to ensure that $lookup on
+ * config.cache.chunks is pushed down to shards to execute as part of the split pipeline.
+ *
+ * @tags: [requires_fcv_49]
+ */
+(function() {
+'use strict';
+
+// Create a cluster with 2 shards.
+const numShards = 2;
+const st = new ShardingTest({shards: numShards});
+const db = st.s.getDB(`${jsTest.name()}_db`);
+
+function assertLookupRunsOnShards(explain) {
+ assert(explain.hasOwnProperty("splitPipeline"), tojson(explain));
+ assert(explain.splitPipeline.hasOwnProperty("shardsPart"), tojson(explain));
+ assert.eq(
+ explain.splitPipeline.shardsPart.filter(stage => stage.hasOwnProperty("$lookup")).length,
+ 1,
+ tojson(explain));
+ assert(explain.splitPipeline.hasOwnProperty("mergerPart"), tojson(explain));
+ assert.eq([], explain.splitPipeline.mergerPart, tojson(explain));
+}
+
+// Test that the explain's shardsPart section includes $lookup stage when executing the resharding
+// collection cloning aggregation pipeline.
+(function testLookupPushedDownToShards() {
+ const coll = db.coll;
+ coll.drop();
+ // Shards the collection into two parts.
+ st.shardColl(coll, {a: "hashed"}, false, false);
+ const explain = coll.explain().aggregate([
+ {$match: {$expr: {$gte: ['$_id', {$literal: 1}]}}},
+ {$sort: {_id: 1}},
+ {$replaceWith: {original: '$$ROOT'}},
+ {$lookup: {
+ from: {
+ db: 'config',
+ coll: 'cache.chunks.test.system.resharding'
+ },
+ let: {sk: [
+ '$original.x',
+ {$toHashedIndexKey: '$original.y'}
+ ]},
+ pipeline: [
+ {$match: {$expr: {$eq: ['$shard', 'shard0']}}},
+ {$match: {$expr: {$let: {
+ vars: {
+ min: {$map: {input: {$objectToArray: '$_id'}, in: '$$this.v'}},
+ max: {$map: {input: {$objectToArray: '$max'}, in: '$$this.v'}}
+ },
+ in: {$and: [
+ {$gte: ['$$sk', '$$min']},
+ {$cond: {
+ if: {$allElementsTrue: [{$map: {
+ input: '$$max',
+ in: {$eq: [{$type: '$$this'}, 'maxKey']}
+ }}]},
+ then: {$lte: ['$$sk', '$$max']},
+ else: {$lt : ['$$sk', '$$max']}
+ }}
+ ]}
+ }}}}
+ ],
+ as: 'intersectingChunk'
+ }},
+ {$match: {intersectingChunk: {$ne: []}}},
+ {$replaceWith: '$original'}
+ ]);
+ assertLookupRunsOnShards(explain);
+})();
+
+st.stop();
+})();
diff --git a/src/mongo/db/matcher/rewrite_expr.cpp b/src/mongo/db/matcher/rewrite_expr.cpp
index 660073d5a8a..a523853a639 100644
--- a/src/mongo/db/matcher/rewrite_expr.cpp
+++ b/src/mongo/db/matcher/rewrite_expr.cpp
@@ -233,7 +233,7 @@ bool RewriteExpr::_canRewriteComparison(
for (auto operand : operandList) {
if (auto exprFieldPath = dynamic_cast<ExpressionFieldPath*>(operand.get())) {
- if (!exprFieldPath->isRootFieldPath()) {
+ if (exprFieldPath->isVariableReference()) {
// This field path refers to a variable rather than a local document field path.
return false;
}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 51c7ccd3fdc..95653b3c21f 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -840,7 +840,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
}
auto fieldPathExpr = dynamic_cast<ExpressionFieldPath*>(_idExpressions.front().get());
- if (!fieldPathExpr || !fieldPathExpr->isRootFieldPath()) {
+ if (!fieldPathExpr || fieldPathExpr->isVariableReference()) {
return nullptr;
}
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
index c201b7da00b..c87f6732c54 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
@@ -163,7 +163,7 @@ list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::create(
if (dynamic_cast<ExpressionConstant*>(partitionBy->get())) {
// partitionBy optimizes to a constant expression, equivalent to a single partition.
} else if (auto exprFieldPath = dynamic_cast<ExpressionFieldPath*>(partitionBy->get());
- exprFieldPath && exprFieldPath->isRootFieldPath()) {
+ exprFieldPath && !exprFieldPath->isVariableReference()) {
// ExpressionFieldPath has "CURRENT" as an explicit first component,
// but for $sort we don't want that.
simplePartitionBy = exprFieldPath->getFieldPath().tail();
diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h
index b9cc6dfe9b8..813b9caf91c 100644
--- a/src/mongo/db/pipeline/expression.h
+++ b/src/mongo/db/pipeline/expression.h
@@ -1485,7 +1485,11 @@ public:
class ExpressionFieldPath final : public Expression {
public:
bool isRootFieldPath() const {
- return _variable == Variables::kRootId;
+ return _variable == Variables::kRootId && _fieldPath.getPathLength() == 1;
+ }
+
+ bool isVariableReference() const {
+ return Variables::isUserDefinedVariable(_variable);
}
boost::intrusive_ptr<Expression> optimize() final;
@@ -1988,7 +1992,7 @@ public:
* Multiplies two values together as if by evaluate() on
* {$multiply: [{$const: lhs}, {$const: rhs}]}.
*
- * Note that evaluate() does not use apply() directly, because when $muliply takes more than
+ * Note that evaluate() does not use apply() directly, because when $multiply takes more than
* two arguments, it uses a wider intermediate state than Value.
*
* Returns BSONNULL if either argument is nullish.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 6d9ba462d35..8f9d9d14ce9 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -2564,6 +2564,39 @@ class LookupWithLetWithDBAndColl : public Base {
}
};
+class CollectionCloningPipeline : public Base {
+ string inputPipeJson() {
+ return "[{$match: {$expr: {$gte: ['$_id', {$literal: 1}]}}}"
+ ",{$sort: {_id: 1}}"
+ ",{$replaceWith: {original: '$$ROOT'}}"
+ ",{$lookup: {from: {db: 'config', coll: 'cache.chunks.test'},"
+ "pipeline: [], as: 'intersectingChunk'}}"
+ ",{$match: {intersectingChunk: {$ne: []}}}"
+ ",{$replaceWith: '$original'}"
+ "]";
+ }
+
+ string shardPipeJson() {
+ return "[{$match: {$and: [{_id: {$_internalExprGte: 1}}, {$expr: {$gte: ['$_id', "
+ "{$const: 1}]}}]}}"
+ ", {$sort: {sortKey: {_id: 1}}}"
+ ", {$replaceRoot: {newRoot: {original: '$$ROOT'}}}"
+ ", {$lookup: {from: {db: 'config', coll: 'cache.chunks.test'}, as: "
+ "'intersectingChunk', let: {}, pipeline: []}}"
+ ", {$match: {intersectingChunk: {$not: {$eq: []}}}}"
+ ", {$replaceRoot: {newRoot: '$original'}}"
+ "]";
+ }
+
+ string mergePipeJson() {
+ return "[]";
+ }
+
+ NamespaceString getLookupCollNs() override {
+ return {"config", "cache.chunks.test"};
+ }
+};
+
} // namespace lookupFromShardsInParallel
namespace moveFinalUnwindFromShardsToMerger {
@@ -2691,7 +2724,7 @@ class MatchWithSkipAddFieldsAndLimit : public Base {
/**
* The addition of a $group stage between the $skip and $limit stages _does_ prevent us from
* propagating the limit to the shards. The merger will need to see all the documents from each
- * shard before it can aply the $limit.
+ * shard before it can apply the $limit.
*/
class MatchWithSkipGroupAndLimit : public Base {
string inputPipeJson() {
@@ -3172,7 +3205,7 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest,
}
/**
- * For the purpsoses of this test, assume every collection is unsharded. Stages may ask this during
+ * For the purposes of this test, assume every collection is unsharded. Stages may ask this during
* setup. For example, to compute its constraints, the $merge stage needs to know if the output
* collection is sharded.
*/
@@ -4208,6 +4241,7 @@ public:
add<Optimizations::Sharded::limitFieldsSentFromShardsToMerger::ShardAlreadyExhaustive>();
add<Optimizations::Sharded::lookupFromShardsInParallel::LookupWithDBAndColl>();
add<Optimizations::Sharded::lookupFromShardsInParallel::LookupWithLetWithDBAndColl>();
+ add<Optimizations::Sharded::lookupFromShardsInParallel::CollectionCloningPipeline>();
add<Optimizations::Sharded::needsPrimaryShardMerger::Out>();
add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithUnshardedCollection>();
add<Optimizations::Sharded::needsPrimaryShardMerger::MergeWithShardedCollection>();
diff --git a/src/mongo/db/pipeline/semantic_analysis.cpp b/src/mongo/db/pipeline/semantic_analysis.cpp
index e44b0af957a..36f1985a913 100644
--- a/src/mongo/db/pipeline/semantic_analysis.cpp
+++ b/src/mongo/db/pipeline/semantic_analysis.cpp
@@ -30,6 +30,8 @@
#include "mongo/platform/basic.h"
#include "mongo/db/matcher/expression_algo.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
+#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/semantic_analysis.h"
@@ -98,47 +100,214 @@ StringMap<std::string> invertRenameMap(const StringMap<std::string>& originalMap
return reversedMap;
}
+const ReplaceRootTransformation* isReplaceRoot(const DocumentSource* source) {
+ // We have to use getSourceName() since DocumentSourceReplaceRoot is never materialized - it
+ // uses DocumentSourceSingleDocumentTransformation.
+ auto singleDocTransform =
+ dynamic_cast<const DocumentSourceSingleDocumentTransformation*>(source);
+ if (!singleDocTransform) {
+ return nullptr;
+ }
+ return dynamic_cast<const ReplaceRootTransformation*>(&singleDocTransform->getTransformer());
+}
+
+/**
+ * Detects if 'replaceRootTransform' represents the nesting of a field path. If it does, returns
+ * the name of that field path. For example, if 'replaceRootTransform' represents the transformation
+ * associated with {$replaceWith: {nested: "$$ROOT"}} or {$replaceRoot: {newRoot: {nested:
+ * "$$ROOT"}}}, returns "nested".
+ */
+boost::optional<std::string> replaceRootNestsRoot(
+ const ReplaceRootTransformation* replaceRootTransform) {
+ auto expressionObject =
+ dynamic_cast<ExpressionObject*>(replaceRootTransform->getExpression().get());
+ if (!expressionObject) {
+ return boost::none;
+ }
+ auto children = expressionObject->getChildExpressions();
+ if (children.size() != 1u) {
+ return boost::none;
+ }
+ auto&& [nestedName, expression] = children[0];
+ if (!dynamic_cast<ExpressionFieldPath*>(expression.get()) ||
+ !dynamic_cast<ExpressionFieldPath*>(expression.get())->isRootFieldPath()) {
+ return boost::none;
+ }
+ return nestedName;
+}
+
+/**
+ * Detects if 'replaceRootTransform' represents the unnesting of a field path. If it does, returns
+ * the name of that field path. For example, if 'replaceRootTransform' represents the transformation
+ * associated with {$replaceWith: "$x"} or {$replaceRoot: {newRoot: "$x"}}, returns "x".
+ */
+boost::optional<std::string> replaceRootUnnestsPath(
+ const ReplaceRootTransformation* replaceRootTransform) {
+ auto expressionFieldPath =
+ dynamic_cast<ExpressionFieldPath*>(replaceRootTransform->getExpression().get());
+ if (!expressionFieldPath) {
+ return boost::none;
+ }
+ return expressionFieldPath->getFieldPathWithoutCurrentPrefix().fullPath();
+}
+
/**
- * Computes and returns a rename mapping for 'pathsOfInterest' over multiple aggregation pipeline
- * stages. The range of pipeline stages we compute renames over is represented by the iterators
- * 'start' and 'end'. If both 'start' and 'end' are reverse iterators, then 'start' should come
- * after 'end' in the pipeline, 'traversalDir' should be "kBackward," 'pathsOfInterest' should be
- * valid path names after stage 'start,' and this template will compute a mapping from the given
- * names of 'pathsOfInterest' to their names as they were directly after stage 'end.'If both 'start'
- * and 'end' are forwards iterators, then 'start' should come before 'end' in the pipeline,
- * 'traversalDir' should be "kForward," 'pathsOfInterest' should be valid path names before stage
- * 'start,' and this template will compute a mapping from the given names of 'pathsOfInterest' to
- * their names as they are directly before stage 'end.'
+ * Looks for a pattern where the user temporarily nests the whole object, does some computation,
+ * then unnests the object. Like so:
+ * [{$replaceWith: {nested: "$$ROOT"}}, ..., {$replaceWith: "$nested"}].
+ *
+ * If this pattern is detected, returns an iterator to the 'second' replace root, whichever is later
+ * according to the traversal order.
+ */
+template <class Iterator>
+boost::optional<Iterator> lookForNestUnnestPattern(
+ Iterator start,
+ Iterator end,
+ std::set<std::string> pathsOfInterest,
+ const Direction& traversalDir,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback) {
+ auto replaceRootTransform = isReplaceRoot((*start).get());
+ if (!replaceRootTransform) {
+ return boost::none;
+ }
+
+ auto targetName = traversalDir == Direction::kForward
+ ? replaceRootNestsRoot(replaceRootTransform)
+ : replaceRootUnnestsPath(replaceRootTransform);
+ if (!targetName || targetName->find(".") != std::string::npos) {
+ // Bail out early on dotted paths - we don't intend to deal with that complexity here,
+ // though we could in the future.
+ return boost::none;
+ }
+ auto nameTestCallback =
+ traversalDir == Direction::kForward ? replaceRootUnnestsPath : replaceRootNestsRoot;
+
+ ++start; // Advance one to go past the first $replaceRoot we just looked at.
+ for (; start != end; ++start) {
+ replaceRootTransform = isReplaceRoot((*start).get());
+ if (!replaceRootTransform) {
+ if (additionalStageValidatorCallback &&
+ !((*additionalStageValidatorCallback)((*start).get()))) {
+ // There was an additional condition which failed - bail out.
+ return boost::none;
+ }
+
+ auto renames = renamedPaths({*targetName}, **start, traversalDir);
+ if (!renames ||
+ (renames->find(*targetName) != renames->end() &&
+ (*renames)[*targetName] != *targetName)) {
+ // This stage is not a $replaceRoot - and it modifies our nested path
+ // ('targetName') somehow.
+ return boost::none;
+ }
+ // This is not a $replaceRoot - but it doesn't impact the nested path, so we continue
+ // searching for the unnester.
+ continue;
+ }
+ if (auto nestName = nameTestCallback(replaceRootTransform);
+ nestName && *nestName == *targetName) {
+ if (additionalStageValidatorCallback &&
+ !((*additionalStageValidatorCallback)((*start).get()))) {
+ // There was an additional condition which failed - bail out.
+ return boost::none;
+ }
+ return start;
+ } else {
+ // If we have a replaceRoot which is not the one we're looking for - then it modifies
+ // the path we're trying to preserve. As a future enhancement, we maybe could recurse
+ // here.
+ return boost::none;
+ }
+ }
+ return boost::none;
+}
+
+/**
+ * Computes and returns a rename mapping for 'pathsOfInterest' over multiple aggregation
+ * pipeline stages. The range of pipeline stages we consider renames over is represented by the
+ * iterators 'start' and 'end'.
+ *
+ * If both 'start' and 'end' are reverse iterators, then 'start' should come after 'end' in the
+ * pipeline, and 'traversalDir' should be "kBackward," 'pathsOfInterest' should be valid path names
+ * after stage 'start.'
+ *
+ * If both 'start' and 'end' are forwards iterators, then 'start' should come before 'end' in the
+ * pipeline, 'traversalDir' should be "kForward," and 'pathsOfInterest' should be valid path names
+ * before stage 'start.'
+ *
+ * This function will compute an iterator pointing to the "last" stage (farthest in the given
+ * direction, not included) which preserves 'pathsOfInterest' allowing renames, and returns that
+ * iterator and a mapping from the given names of 'pathsOfInterest' to their names as they were
+ * directly "before" (just previous to, according to the direction) the result iterator. If all
+ * stages preserve the paths of interest, returns 'end.'
+ *
+ * An optional 'additionalStageValidatorCallback' function can be provided to short-circuit this
+ * process and return an iterator to the first stage which either (a) does not preserve
+ * 'pathsOfInterest,' as before, or (b) does not meet this callback function's criteria.
*
* This should only be used internally; callers who need to track path renames through an
- * aggregation pipeline should use one of the publically exposed options availible in the header.
+ * aggregation pipeline should use one of the publically exposed options available in the header.
*/
template <class Iterator>
-boost::optional<StringMap<std::string>> multiStageRenamedPaths(
+std::pair<Iterator, StringMap<std::string>> multiStageRenamedPaths(
Iterator start,
Iterator end,
std::set<std::string> pathsOfInterest,
- const Direction& traversalDir) {
- // The keys to this map will always be the original names of 'pathsOfInterest'. The values will
- // be updated as we loop through the pipeline's stages to always be the most up-to-date name we
- // know of for that path.
+ const Direction& traversalDir,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback =
+ boost::none) {
+ // The keys to this map will always be the original names of 'pathsOfInterest'. The values
+ // will be updated as we loop through the pipeline's stages to always be the most up-to-date
+ // name we know of for that path.
StringMap<std::string> renameMap;
for (auto&& path : pathsOfInterest) {
renameMap[path] = path;
}
for (; start != end; ++start) {
+ if (additionalStageValidatorCallback &&
+ !((*additionalStageValidatorCallback)((*start).get()))) {
+ // There was an additional condition which failed - bail out.
+ return {start, renameMap};
+ }
+
auto renamed = renamedPaths(pathsOfInterest, **start, traversalDir);
if (!renamed) {
- return boost::none;
+ if (auto finalReplaceRoot = lookForNestUnnestPattern<Iterator>(
+ start, end, pathsOfInterest, traversalDir, additionalStageValidatorCallback)) {
+ // We've just detected a pattern where the user temporarily nests the whole
+ // object, does some computation, then unnests the object. Like so:
+ // [{$replaceWith: {nested: "$$ROOT"}}, ..., {$replaceWith: "$nested"}].
+ // This analysis makes sure that the middle stages don't modify 'nested' or
+ // whatever the nesting field path is and the additional callback function's
+ // criteria is met. In this case, we can safely skip over all intervening stages and
+ // continue on our way.
+ start = *finalReplaceRoot;
+ continue;
+ }
+ return {start, renameMap};
}
- //'pathsOfInterest' always holds the current names of the paths we're interested in, so it
- // needs to be updated after each stage.
+ //'pathsOfInterest' always holds the current names of the paths we're interested in, so
+ // it needs to be updated after each stage.
pathsOfInterest.clear();
for (auto it = renameMap.cbegin(); it != renameMap.cend(); ++it) {
renameMap[it->first] = (*renamed)[it->second];
pathsOfInterest.emplace(it->second);
}
}
+ return {end, renameMap};
+}
+template <class Iterator>
+boost::optional<StringMap<std::string>> renamedPathsFullPipeline(
+ Iterator start,
+ Iterator end,
+ std::set<std::string> pathsOfInterest,
+ const Direction& traversalDir,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback) {
+ auto [itr, renameMap] = multiStageRenamedPaths(
+ start, end, pathsOfInterest, traversalDir, additionalStageValidatorCallback);
+ if (itr != end) {
+ return boost::none; // The paths were not preserved to the very end.
+ }
return renameMap;
}
@@ -232,15 +401,28 @@ boost::optional<StringMap<std::string>> renamedPaths(const std::set<std::string>
boost::optional<StringMap<std::string>> renamedPaths(
const Pipeline::SourceContainer::const_iterator start,
const Pipeline::SourceContainer::const_iterator end,
- const std::set<std::string>& pathsOfInterest) {
- return multiStageRenamedPaths(start, end, pathsOfInterest, Direction::kForward);
+ const std::set<std::string>& pathsOfInterest,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback) {
+ return renamedPathsFullPipeline(
+ start, end, pathsOfInterest, Direction::kForward, additionalStageValidatorCallback);
}
boost::optional<StringMap<std::string>> renamedPaths(
const Pipeline::SourceContainer::const_reverse_iterator start,
const Pipeline::SourceContainer::const_reverse_iterator end,
- const std::set<std::string>& pathsOfInterest) {
- return multiStageRenamedPaths(start, end, pathsOfInterest, Direction::kBackward);
+ const std::set<std::string>& pathsOfInterest,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback) {
+ return renamedPathsFullPipeline(
+ start, end, pathsOfInterest, Direction::kBackward, additionalStageValidatorCallback);
}
+std::pair<Pipeline::SourceContainer::const_iterator, StringMap<std::string>>
+findLongestViablePrefixPreservingPaths(
+ const Pipeline::SourceContainer::const_iterator start,
+ const Pipeline::SourceContainer::const_iterator end,
+ const std::set<std::string>& pathsOfInterest,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback) {
+ return multiStageRenamedPaths(
+ start, end, pathsOfInterest, Direction::kForward, additionalStageValidatorCallback);
+}
} // namespace mongo::semantic_analysis
diff --git a/src/mongo/db/pipeline/semantic_analysis.h b/src/mongo/db/pipeline/semantic_analysis.h
index 73739919e10..f7c0b3f56b5 100644
--- a/src/mongo/db/pipeline/semantic_analysis.h
+++ b/src/mongo/db/pipeline/semantic_analysis.h
@@ -72,7 +72,9 @@ boost::optional<StringMap<std::string>> renamedPaths(const std::set<std::string>
boost::optional<StringMap<std::string>> renamedPaths(
const Pipeline::SourceContainer::const_iterator start,
const Pipeline::SourceContainer::const_iterator end,
- const std::set<std::string>& pathsOfInterest);
+ const std::set<std::string>& pathsOfInterest,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback =
+ boost::none);
/**
* Tracks renames by walking a pipeline backwards. Takes two reverse iterators that represent two
@@ -87,7 +89,24 @@ boost::optional<StringMap<std::string>> renamedPaths(
boost::optional<StringMap<std::string>> renamedPaths(
const Pipeline::SourceContainer::const_reverse_iterator start,
const Pipeline::SourceContainer::const_reverse_iterator end,
- const std::set<std::string>& pathsOfInterest);
+ const std::set<std::string>& pathsOfInterest,
+ boost::optional<std::function<bool(DocumentSource*)>> additionalStageValidatorCallback =
+ boost::none);
+
+/**
+ * Attempts to find a maximal prefix of the pipeline given by 'start' and 'end' which will preserve
+ * all paths in 'pathsOfInterest' and also have each DocumentSource satisfy
+ * 'additionalStageValidatorCallback'.
+ *
+ * Returns an iterator to the first stage which modifies one of the paths in 'pathsOfInterest' or
+ * fails 'additionalStageValidatorCallback', or returns 'end' if no such stage exists.
+ */
+std::pair<Pipeline::SourceContainer::const_iterator, StringMap<std::string>>
+findLongestViablePrefixPreservingPaths(const Pipeline::SourceContainer::const_iterator start,
+ const Pipeline::SourceContainer::const_iterator end,
+ const std::set<std::string>& pathsOfInterest,
+ boost::optional<std::function<bool(DocumentSource*)>>
+ additionalStageValidatorCallback = boost::none);
/**
* Given a set of paths 'dependencies', determines which of those paths will be modified if all
@@ -99,4 +118,7 @@ boost::optional<StringMap<std::string>> renamedPaths(
std::set<std::string> extractModifiedDependencies(const std::set<std::string>& dependencies,
const std::set<std::string>& preservedPaths);
+bool pathSetContainsOverlappingPath(const std::set<std::string>& paths,
+ const std::string& targetPath);
+
} // namespace mongo::semantic_analysis
diff --git a/src/mongo/db/pipeline/semantic_analysis_test.cpp b/src/mongo/db/pipeline/semantic_analysis_test.cpp
index a2fcf808327..ca95d9098a2 100644
--- a/src/mongo/db/pipeline/semantic_analysis_test.cpp
+++ b/src/mongo/db/pipeline/semantic_analysis_test.cpp
@@ -439,5 +439,318 @@ TEST_F(SemanticAnalysisRenamedPaths, ReturnsNoneWhenModificationsAreNotKnown) {
}
}
+TEST_F(SemanticAnalysisRenamedPaths, DetectsSimpleReplaceRootPattern) {
+ auto pipeline = Pipeline::parse(
+ {fromjson("{$replaceWith: {nested: '$$ROOT'}}"), fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"b"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, DetectsReplaceRootPatternAllowsIntermediateStages) {
+ auto pipeline =
+ Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"),
+ fromjson("{$match: {bigEnough: true}}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"b"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, AdditionalStageValidatorCallbackPassed) {
+ auto pipeline =
+ Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"),
+ fromjson("{$match: {bigEnough: true}}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ std::function<bool(DocumentSource*)> callback = [](DocumentSource* stage) {
+ return !static_cast<bool>(stage->distributedPlanLogic());
+ };
+ {
+ auto renames = renamedPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}, callback);
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = renamedPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"b"}, callback);
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = renamedPaths(
+ pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}, callback);
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, AdditionalStageValidatorCallbackNotPassed) {
+ auto pipeline =
+ Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"),
+ fromjson("{$match: {bigEnough: true}}"),
+ fromjson("{$sort: {x: 1}}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ std::function<bool(DocumentSource*)> callback = [](DocumentSource* stage) {
+ return !static_cast<bool>(stage->distributedPlanLogic());
+ };
+ {
+ auto renames = renamedPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"a"}, callback);
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames = renamedPaths(
+ pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"}, callback);
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, DetectsReplaceRootPatternDisallowsIntermediateModification) {
+ auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set: {'nested.field': 'anyNewValue'}}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, DoesNotDetectFalseReplaceRootIfTypoed) {
+ auto pipeline = Pipeline::parse(
+ {fromjson("{$replaceWith: {nested: '$$ROOT'}}"), fromjson("{$replaceWith: '$nestedTypo'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, DetectsReplaceRootPatternIfCurrentInsteadOfROOT) {
+ auto pipeline = Pipeline::parse(
+ {fromjson("{$replaceWith: {nested: '$$CURRENT'}}"), fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_TRUE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, DoesNotDetectFalseReplaceRootIfNoROOT) {
+ auto pipeline = Pipeline::parse(
+ {fromjson("{$replaceWith: {nested: '$subObj'}}"), fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+TEST_F(SemanticAnalysisRenamedPaths, DoesNotDetectFalseReplaceRootIfTargetPathIsRenamed) {
+
+ {
+ auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$unset : 'nested'}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ auto renames =
+ renamedPaths(pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set : {nested: '$somethingElese'}}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ // This case could someday work - we leave it as a future improvement.
+ auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set : {somethingElse: '$nested'}}"),
+ fromjson("{$replaceWith: '$somethingElse'}")},
+ getExpCtx());
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ // This is a tricky one. The pattern does exist, but it's doubly nested and only unnested
+ // once.
+ auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$replaceWith: {doubleNested: '$nested'}}"),
+ fromjson("{$replaceWith: '$doubleNested'}")},
+ getExpCtx());
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+ {
+ // Similar to above but double nested then double unnested. We could someday make this work,
+ // but leave it for a future improvement.
+ auto pipeline = Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$replaceWith: {doubleNested: '$nested'}}"),
+ fromjson("{$replaceWith: '$doubleNested'}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ auto renames =
+ renamedPaths(pipeline->getSources().rbegin(), pipeline->getSources().rend(), {"b"});
+ ASSERT_FALSE(static_cast<bool>(renames));
+ }
+}
+
+using SemanticAnalysisFindLongestViablePrefix = AggregationContextFixture;
+TEST_F(SemanticAnalysisFindLongestViablePrefix, AllowsReplaceRootPattern) {
+ auto pipeline =
+ Pipeline::parse({fromjson("{$replaceWith: {nested: '$$ROOT'}}"),
+ fromjson("{$set: {bigEnough: {$gte: [{$bsonSize: '$nested'}, 300]}}}"),
+ fromjson("{$match: {bigEnough: true}}"),
+ fromjson("{$replaceWith: '$nested'}")},
+ getExpCtx());
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT(itr == pipeline->getSources().end());
+}
+
+TEST_F(SemanticAnalysisFindLongestViablePrefix, FindsPrefixWithoutReplaceRoot) {
+ auto pipeline = Pipeline::parse({fromjson("{$match: {testing: true}}"),
+ fromjson("{$unset: 'unset'}"),
+ fromjson("{$set: {x: '$y'}}")},
+ getExpCtx());
+ {
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"a"});
+ ASSERT(itr == pipeline->getSources().end());
+ }
+ {
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"unset"});
+ ASSERT(itr == std::next(pipeline->getSources().begin()));
+ }
+ {
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"y"});
+ ASSERT(itr == pipeline->getSources().end());
+ ASSERT(renames["y"] == "x");
+ }
+ {
+ // TODO (SERVER-55815): "x" should be considered modified in the $set stage.
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"x"});
+ ASSERT(itr == pipeline->getSources().end());
+ ASSERT(renames["x"] == "x");
+ }
+}
+
+TEST_F(SemanticAnalysisFindLongestViablePrefix, FindsLastPossibleStageWithCallback) {
+ auto pipeline = Pipeline::parse({fromjson("{$match: {testing: true}}"),
+ fromjson("{$unset: 'unset'}"),
+ fromjson("{$sort: {y: 1}}"),
+ fromjson("{$set: {x: '$y'}}")},
+ getExpCtx());
+ {
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"y"});
+ ASSERT(itr == pipeline->getSources().end());
+ ASSERT(renames["y"] == "x");
+ }
+ std::function<bool(DocumentSource*)> callback = [](DocumentSource* stage) {
+ return !static_cast<bool>(stage->distributedPlanLogic());
+ };
+ {
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"y"}, callback);
+ ASSERT(itr == std::prev(std::prev(pipeline->getSources().end())));
+ ASSERT(renames["y"] == "y");
+ }
+}
+
+TEST_F(SemanticAnalysisFindLongestViablePrefix, CorrectlyAnswersReshardingUseCase) {
+ auto expCtx = getExpCtx();
+ auto lookupNss = NamespaceString{"config.cache.chunks.test"};
+ expCtx->setResolvedNamespace(lookupNss, ExpressionContext::ResolvedNamespace{lookupNss, {}});
+ auto pipeline =
+ Pipeline::parse({fromjson("{$replaceWith: {original: '$$ROOT'}}"),
+ fromjson("{$lookup: {from: {db: 'config', coll: 'cache.chunks.test'}, "
+ "pipeline: [], as: 'intersectingChunk'}}"),
+ fromjson("{$match: {intersectingChunk: {$ne: []}}}"),
+ fromjson("{$replaceWith: '$original'}")},
+ getExpCtx());
+ std::function<bool(DocumentSource*)> callback = [](DocumentSource* stage) {
+ return !static_cast<bool>(stage->distributedPlanLogic());
+ };
+ {
+ auto [itr, renames] = findLongestViablePrefixPreservingPaths(
+ pipeline->getSources().begin(), pipeline->getSources().end(), {"_id"}, callback);
+ ASSERT(itr == pipeline->getSources().end());
+ ASSERT(renames["_id"] == "_id");
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index a3296e95f45..678d5b24caf 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -299,6 +299,38 @@ void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe)
}
/**
+ * When the last stage of shard pipeline is $sort, move stages that can run on shards and don't
+ * rename or modify the fields in $sort from merge pipeline. The function starts from the beginning
+ * of the merge pipeline and finds the first consecutive eligible stages.
+ */
+void moveEligibleStreamingStagesBeforeSortOnShards(Pipeline* shardPipe,
+ Pipeline* mergePipe,
+ const BSONObj& sortPattern) {
+ tassert(5363800,
+ "Expected non-empty shardPipe consisting of at least a $sort stage",
+ !shardPipe->getSources().empty());
+ if (!dynamic_cast<DocumentSourceSort*>(shardPipe->getSources().back().get())) {
+ // Expected last stage on the shards to be a $sort.
+ return;
+ }
+ auto sortPaths = sortPattern.getFieldNames<std::set<std::string>>();
+ auto firstMergeStage = mergePipe->getSources().cbegin();
+ std::function<bool(DocumentSource*)> distributedPlanLogicCallback = [](DocumentSource* stage) {
+ return !static_cast<bool>(stage->distributedPlanLogic());
+ };
+ auto [lastUnmodified, renameMap] = semantic_analysis::findLongestViablePrefixPreservingPaths(
+ firstMergeStage, mergePipe->getSources().cend(), sortPaths, distributedPlanLogicCallback);
+ for (const auto& sortPath : sortPaths) {
+ auto pair = renameMap.find(sortPath);
+ if (pair == renameMap.end() || pair->first != pair->second) {
+ return;
+ }
+ }
+ shardPipe->getSources().insert(shardPipe->getSources().end(), firstMergeStage, lastUnmodified);
+ mergePipe->getSources().erase(firstMergeStage, lastUnmodified);
+}
+
+/**
* Returns true if the final stage of the pipeline limits the number of documents it could output
* (such as a $limit stage).
*
@@ -775,6 +807,10 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
// The order in which optimizations are applied can have significant impact on the efficiency of
// the final pipeline. Be Careful!
+ if (inputsSort) {
+ moveEligibleStreamingStagesBeforeSortOnShards(
+ shardsPipeline.get(), mergePipeline.get(), *inputsSort);
+ }
moveFinalUnwindFromShardsToMerger(shardsPipeline.get(), mergePipeline.get());
propagateDocLimitToShards(shardsPipeline.get(), mergePipeline.get());
limitFieldsSentFromShardsToMerger(shardsPipeline.get(), mergePipeline.get());
@@ -857,7 +893,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// pipeline; if not, we retain the existing pipeline.
// - Call establishShardCursors to dispatch the aggregation to the targeted shards.
// - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
- // entire aggregation commmand.
+ // entire aggregation command.
auto cursors = std::vector<RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
diff --git a/src/mongo/db/query/sbe_stage_builder_expression.cpp b/src/mongo/db/query/sbe_stage_builder_expression.cpp
index 5c68c224b85..6353d5e31a1 100644
--- a/src/mongo/db/query/sbe_stage_builder_expression.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_expression.cpp
@@ -1173,7 +1173,7 @@ public:
collatorSlot,
_context->planNodeId);
- // Create a branch stage to select between the branch that produces one null if any eleemnts
+ // Create a branch stage to select between the branch that produces one null if any elements
// in the original input were null or missing, or otherwise select the branch that unwinds
// and concatenates elements into the output array.
auto [nullExpr, nullStage] = makeNullLimitCoscanTree();
@@ -1790,7 +1790,7 @@ public:
}
sbe::value::SlotId slotId;
- if (expr->isRootFieldPath()) {
+ if (!expr->isVariableReference()) {
slotId = _context->rootSlot;
} else {
auto it = _context->environment.find(expr->getVariableId());