summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2022-05-11 19:55:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-11 20:20:17 +0000
commit768de6a405e5a41287d4368545e0a69a4d6bb676 (patch)
treee7b6b9df1f15ae3f1a5ec55c78c21013dcd348ed
parenta2dfdcccedd7d0240e199f917ed6c9ff799fbd93 (diff)
downloadmongo-768de6a405e5a41287d4368545e0a69a4d6bb676.tar.gz
SERVER-65692 $searchMeta improvement
-rw-r--r--jstests/aggregation/variables/search_meta.js13
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.h21
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_mock_collection.cpp52
-rw-r--r--src/mongo/db/pipeline/document_source_mock_collection.h97
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h12
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp40
-rw-r--r--src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h27
-rw-r--r--src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl6
-rw-r--r--src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp30
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp7
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp2
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp35
-rw-r--r--src/mongo/executor/task_executor_cursor.h6
19 files changed, 81 insertions, 307 deletions
diff --git a/jstests/aggregation/variables/search_meta.js b/jstests/aggregation/variables/search_meta.js
index 4cf0409c674..ba9fe1430ae 100644
--- a/jstests/aggregation/variables/search_meta.js
+++ b/jstests/aggregation/variables/search_meta.js
@@ -40,7 +40,14 @@ assert.commandFailedWithCode(db.runCommand({
}),
ErrorCodes.FailedToParse);
-assert.throwsWithCode(
- () => db.non_existent_namespace.aggregate([{$searchMeta: {query: {nonsense: true}}}]),
- [6448001, 31082, 40324]); // Error code may change on mongos or on community server.
+const response = db.runCommand({
+ aggregate: "non_existent_namespace",
+ pipeline: [{$searchMeta: {query: {nonsense: true}}}],
+ cursor: {}
+});
+if (!response.ok) {
+ assert.commandFailedWithCode(response, [31082, 40324] /* community or mongos */);
+} else {
+ assert.eq(response.cursor.firstBatch, []);
+}
})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 01484843eba..6b14f635095 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -299,7 +299,6 @@ pipelineEnv.Library(
'document_source_lookup.cpp',
'document_source_match.cpp',
'document_source_merge.cpp',
- 'document_source_mock_collection.cpp',
'document_source_operation_metrics.cpp',
'document_source_out.cpp',
'document_source_plan_cache_stats.cpp',
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index ff2aaf9f8ae..29b98369723 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -267,14 +267,29 @@ public:
* collection. Describes how a pipeline should be split for sharded execution.
*/
struct DistributedPlanLogic {
+ DistributedPlanLogic() = default;
+
+ /**
+ * Convenience constructor for the common case where there is at most one merging stage. Can
+ * pass nullptr for the merging stage which means "no merging required."
+ */
+ DistributedPlanLogic(boost::intrusive_ptr<DocumentSource> shardsStageIn,
+ boost::intrusive_ptr<DocumentSource> mergeStage,
+ boost::optional<BSONObj> mergeSortPatternIn = boost::none)
+ : shardsStage(std::move(shardsStageIn)),
+ mergeSortPattern(std::move(mergeSortPatternIn)) {
+ if (mergeStage)
+ mergingStages.emplace_back(std::move(mergeStage));
+ }
+
typedef std::function<bool(const DocumentSource&)> movePastFunctionType;
// A stage which executes on each shard in parallel, or nullptr if nothing can be done in
// parallel. For example, a partial $group before a subsequent global $group.
boost::intrusive_ptr<DocumentSource> shardsStage = nullptr;
- // A stage which executes after merging all the results together, or nullptr if nothing is
- // necessary after merging. For example, a $limit stage.
- boost::intrusive_ptr<DocumentSource> mergingStage = nullptr;
+ // A stage or stages which funciton to merge all the results together, or an empty list if
+ // nothing is necessary after merging. For example, a $limit stage.
+ std::list<boost::intrusive_ptr<DocumentSource>> mergingStages = {};
// If set, each document is expected to have sort key metadata which will be serialized in
// the '$sortKey' field. 'mergeSortPattern' will then be used to describe which fields are
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index f7fb2af6f92..d3ed11f2031 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -602,10 +602,12 @@ protected:
// case only one shard is in use.
auto distributedPlanLogic = group()->distributedPlanLogic();
ASSERT(distributedPlanLogic);
- ASSERT(distributedPlanLogic->mergingStage);
- ASSERT_NOT_EQUALS(group(), distributedPlanLogic->mergingStage);
+ ASSERT_EQ(distributedPlanLogic->mergingStages.size(), 1)
+ << distributedPlanLogic->mergingStages.size();
+ auto mergingStage = *distributedPlanLogic->mergingStages.begin();
+ ASSERT_NOT_EQUALS(group(), mergingStage);
ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->mergeSortPattern));
- return distributedPlanLogic->mergingStage;
+ return mergingStage;
}
void checkResultSet(const intrusive_ptr<DocumentSource>& sink) {
// Load the results from the DocumentSourceGroup and sort them by _id.
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index b6013459bef..a565fb60c26 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -751,7 +751,7 @@ TEST_F(DocumentSourceLookUpTest, LookupDistributedPlanLogic) {
expCtx);
ASSERT(lookupStage->distributedPlanLogic());
ASSERT(lookupStage->distributedPlanLogic()->shardsStage == nullptr);
- ASSERT(lookupStage->distributedPlanLogic()->mergingStage != nullptr);
+ ASSERT_EQ(lookupStage->distributedPlanLogic()->mergingStages.size(), 1);
}
TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) {
diff --git a/src/mongo/db/pipeline/document_source_mock_collection.cpp b/src/mongo/db/pipeline/document_source_mock_collection.cpp
deleted file mode 100644
index cce15db2788..00000000000
--- a/src/mongo/db/pipeline/document_source_mock_collection.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/pipeline/document_source_mock_collection.h"
-
-namespace mongo {
-
-REGISTER_INTERNAL_DOCUMENT_SOURCE(mockCollection,
- LiteParsedDocumentSourceDefault::parse,
- DocumentSourceMockCollection::createFromBson,
- true);
-
-boost::intrusive_ptr<DocumentSource> DocumentSourceMockCollection::createFromBson(
- BSONElement arrayElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- // Since we've already determined the stage name matched, the parser for $mockCollection is the
- // same as the parser for $queue. Let's just re-use that parse function.
- return make_intrusive<DocumentSourceMockCollection>(
- DocumentSourceQueue::parseFromArray(arrayElem), expCtx);
-}
-
-Value DocumentSourceMockCollection::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- return DocumentSourceQueue::serializeWithName(explain, kStageName);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_mock_collection.h b/src/mongo/db/pipeline/document_source_mock_collection.h
deleted file mode 100644
index 2637c710e3b..00000000000
--- a/src/mongo/db/pipeline/document_source_mock_collection.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (C) 2022-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_queue.h"
-
-namespace mongo {
-
-/**
- * This is pretty much funcitonally equivalent to $documents or $queue, but it pretends to be a
- * collection instead so it is usable on a normal namespace rather than a "collectionless" one.
- *
- * TODO SERVER-65534 this stage also works around the restriction that $documents must run on
- * mongos.
- */
-class DocumentSourceMockCollection : public DocumentSourceQueue {
-public:
- static constexpr StringData kStageName = "$mockCollection"_sd;
-
- DocumentSourceMockCollection(std::deque<GetNextResult> results,
- const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceQueue(std::move(results), expCtx, kStageName) {}
-
- ~DocumentSourceMockCollection() override = default;
-
- const char* getSourceName() const override {
- return kStageName.rawData();
- }
-
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override;
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const override {
- auto queueConstraints = DocumentSourceQueue::constraints(pipeState);
- // Technically this stage is independent in that it doesn't read any data. But the point of
- // this stage is to replace a real collection and pretend that it is the collection. So in
- // order to permit using this stage with a "real" (non-collectionless) namespace, we need to
- // set this to false.
- queueConstraints.isIndependentOfAnyCollection = false;
- // TODO SERVER-65534 This is a hacky workaround. In order to get this stage to work in a
- // pipeline followed by another stage which must run on a shard, we should be able to
- // forward the entire pipeline to a shard. That kind of 'host type requirement' doesn't
- // exist yet.
- queueConstraints.hostRequirement = StageConstraints::HostTypeRequirement::kNone;
- queueConstraints.requiredPosition = StageConstraints::PositionRequirement::kNone;
- return queueConstraints;
- }
-
- boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
- // TODO SERVER-65534 This is a hacky workaround. In order to get this stage to work in a
- // pipeline with another stage which must run on a shard, we should be able to forward the
- // entire pipeline to a shard. Until we know how to do that though, it is correct but not
- // performant to do this instead:
-
- // {shards stage, merging stage, merge sort}.
- return DistributedPlanLogic{
- // We will ignore all results, so add a limit to reduce perf impact.
- DocumentSourceLimit::create(pExpCtx, 1),
- // This needs to happen in the merging half so that we don't repeat the data N
- // times where N is the number of shards with at least one chunk for this
- // collection.
- this,
- boost::none};
- }
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
index 7f73c74061e..c724db5ae10 100644
--- a/src/mongo/db/pipeline/document_source_queue.cpp
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -39,21 +39,16 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(queue,
boost::intrusive_ptr<DocumentSource> DocumentSourceQueue::createFromBson(
BSONElement arrayElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return DocumentSourceQueue::create(expCtx, parseFromArray(arrayElem));
-}
-
-std::deque<DocumentSource::GetNextResult> DocumentSourceQueue::parseFromArray(
- BSONElement arrayElem) {
uassert(5858201,
"literal documents specification must be an array",
arrayElem.type() == BSONType::Array);
- std::deque<GetNextResult> queue;
+ auto queue = DocumentSourceQueue::create(expCtx);
// arrayElem is an Array and can be iterated through by using .Obj() method
for (auto elem : arrayElem.Obj()) {
uassert(5858202,
"literal documents specification must be an array of objects",
elem.type() == BSONType::Object);
- queue.emplace_back(Document{elem.Obj()}.getOwned());
+ queue->emplace_back(Document{elem.Obj()}.getOwned());
}
return queue;
}
@@ -61,14 +56,7 @@ std::deque<DocumentSource::GetNextResult> DocumentSourceQueue::parseFromArray(
boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<StringData> aliasStageName) {
- return DocumentSourceQueue::create(expCtx, {}, aliasStageName);
-}
-
-boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::deque<GetNextResult> documents,
- boost::optional<StringData> aliasStageName) {
- return make_intrusive<DocumentSourceQueue>(std::move(documents), expCtx, aliasStageName);
+ return new DocumentSourceQueue({}, expCtx, aliasStageName);
}
DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results,
@@ -93,16 +81,11 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
}
Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- return serializeWithName(explain, kStageName);
-}
-
-Value DocumentSourceQueue::serializeWithName(boost::optional<ExplainOptions::Verbosity> explain,
- StringData stageName) const {
ValueArrayStream vals;
for (auto elem : _queue) {
vals << elem.getDocument().getOwned();
}
- return Value(DOC(stageName << vals.done()));
+ return Value(DOC(kStageName << vals.done()));
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
index b2182b1ff9d..31dc128f6cb 100644
--- a/src/mongo/db/pipeline/document_source_queue.h
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -48,11 +48,6 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<StringData> aliasStageName = boost::none);
- static boost::intrusive_ptr<DocumentSourceQueue> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::deque<GetNextResult> docs,
- boost::optional<StringData> aliasStageName = boost::none);
-
DocumentSourceQueue(std::deque<GetNextResult> results,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<StringData> aliasStageName = boost::none);
@@ -90,7 +85,7 @@ public:
return DepsTracker::SEE_NEXT;
}
- virtual boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
return boost::none;
}
@@ -111,11 +106,6 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
protected:
- static std::deque<DocumentSource::GetNextResult> parseFromArray(BSONElement arrayElem);
-
- Value serializeWithName(boost::optional<ExplainOptions::Verbosity> explain,
- StringData stageName) const;
-
GetNextResult doGetNext() override;
// Return documents from front of queue.
std::deque<GetNextResult> _queue;
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 1b5cb79ec7f..5ec204d1fa9 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -128,7 +128,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::dist
DistributedPlanLogic logic;
logic.shardsStage = this;
if (_size > 0) {
- logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size);
+ logic.mergingStages = {DocumentSourceLimit::create(pExpCtx, _size)};
}
// Here we don't use 'randSortSpec' because it uses a metadata sort which the merging logic does
diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp
index f87810b1f06..a3c3cfaf8ef 100644
--- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp
+++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp
@@ -61,10 +61,6 @@ Value DocumentSourceSetVariableFromSubPipeline::serialize(
tassert(625298, "SubPipeline cannot be null during serialization", _subPipeline);
spec.setSetVariable(var);
spec.setPipeline(_subPipeline->serializeToBson(explain));
- if (_ifEmptyExpr) {
- auto ifEmptyBson = BSON("ifEmpty" << _ifEmptyExpr->serialize(bool(explain)));
- spec.setIfEmptyExpr(IDLAnyTypeOwned::parseFromBSON(ifEmptyBson.firstElement()));
- }
return Value(DOC(getSourceName() << spec.toBSON()));
}
@@ -95,35 +91,22 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSetVariableFromSubPipeline::c
std::unique_ptr<Pipeline, PipelineDeleter> pipeline =
Pipeline::parse(spec.getPipeline(), expCtx->copyForSubPipeline(expCtx->ns));
-
- boost::intrusive_ptr<Expression> ifEmptyExpr;
- if (spec.getIfEmptyExpr()) {
- // The semantics of 'ifEmpty' are to execute in the sub-pipeline's context. So for example,
- // it should see the sub-pipelines definition of "$$SEARCH_META", if there was one. So it's
- // important to use that pipeline's ExpressionContext and variables here during evaluation.
- ifEmptyExpr = Expression::parseOperand(pipeline->getContext().get(),
- spec.getIfEmptyExpr()->getElement(),
- pipeline->getContext()->variablesParseState);
- }
-
return DocumentSourceSetVariableFromSubPipeline::create(
- expCtx, std::move(pipeline), Variables::kSearchMetaId, ifEmptyExpr);
+ expCtx, std::move(pipeline), Variables::kSearchMetaId);
}
intrusive_ptr<DocumentSourceSetVariableFromSubPipeline>
DocumentSourceSetVariableFromSubPipeline::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> subpipeline,
- Variables::Id varID,
- boost::intrusive_ptr<Expression> ifEmptyExpression) {
+ Variables::Id varID) {
uassert(625290,
str::stream()
<< "SetVariableFromSubPipeline only allows setting $$SEARCH_META variable, '$$"
<< Variables::getBuiltinVariableName(varID) << "' is not allowed.",
!Variables::isUserDefinedVariable(varID) && varID == Variables::kSearchMetaId);
return intrusive_ptr<DocumentSourceSetVariableFromSubPipeline>(
- new DocumentSourceSetVariableFromSubPipeline(
- expCtx, std::move(subpipeline), varID, std::move(ifEmptyExpression)));
+ new DocumentSourceSetVariableFromSubPipeline(expCtx, std::move(subpipeline), varID));
};
DocumentSource::GetNextResult DocumentSourceSetVariableFromSubPipeline::doGetNext() {
@@ -132,20 +115,9 @@ DocumentSource::GetNextResult DocumentSourceSetVariableFromSubPipeline::doGetNex
"Expected to have already attached a cursor source to the pipeline",
!_subPipeline->peekFront()->constraints().requiresInputDocSource);
auto nextSubPipelineInput = _subPipeline->getNext();
- if (!nextSubPipelineInput) {
- uassert(625296,
- "No document returned from $SetVariableFromSubPipeline subpipeline, and no "
- "default expression",
- _ifEmptyExpr);
- auto exprResult =
- _ifEmptyExpr->evaluate(Document{}, &_subPipeline->getContext()->variables);
- uassert(6448001,
- str::stream() << "ifEmpty expression did not evaluate to an object: "
- << exprResult.toString() << " (_ifEmptyExpr: "
- << _ifEmptyExpr->serialize(true).toString() << ")",
- exprResult.isObject());
- nextSubPipelineInput = exprResult.getDocument();
- }
+ uassert(625296,
+ "No document returned from $SetVariableFromSubPipeline subpipeline",
+ nextSubPipelineInput);
uassert(625297,
"Multiple documents returned from $SetVariableFromSubPipeline subpipeline when "
"only one expected",
diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h
index ab0d4f5dd20..0fccb527fad 100644
--- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h
+++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h
@@ -31,8 +31,6 @@
#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/document_source_set_variable_from_subpipeline_gen.h"
-
namespace mongo {
@@ -46,8 +44,7 @@ public:
static boost::intrusive_ptr<DocumentSourceSetVariableFromSubPipeline> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> subpipeline,
- Variables::Id varID,
- boost::intrusive_ptr<Expression> defaultExpression = nullptr);
+ Variables::Id varID);
~DocumentSourceSetVariableFromSubPipeline() = default;
@@ -101,26 +98,10 @@ public:
protected:
DocumentSourceSetVariableFromSubPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> subpipeline,
- Variables::Id varID,
- boost::intrusive_ptr<Expression> ifEmptyExpr)
+ Variables::Id varID)
: DocumentSource(kStageName, expCtx),
_subPipeline(std::move(subpipeline)),
- _variableID(varID),
- _ifEmptyExpr(std::move(ifEmptyExpr)) {
- if (_ifEmptyExpr) {
- // The 'ifEmpty' expression runs if there were no results, so naturally will have no
- // document to use for expressions like "$a" to make sense. Here we assert that whatever
- // expression is given doesn't depend on any specific fields. It should be something
- // like a $$SEARCH_META variable or a constant, not something like {$ifNull: ["$a", 0]}.
- DepsTracker deps;
- _ifEmptyExpr->addDependencies(&deps);
- uassert(6448000,
- str::stream() << SetVariableFromSubPipelineSpec::kIfEmptyExprFieldName
- << " must not reference any field paths: "
- << _ifEmptyExpr->serialize(true /* explain */).toString(),
- !deps.needWholeDocument && deps.fields.empty());
- }
- }
+ _variableID(varID) {}
private:
@@ -128,8 +109,6 @@ private:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
std::unique_ptr<Pipeline, PipelineDeleter> _subPipeline;
Variables::Id _variableID;
- // An expression to use if the pipeline does not return any results.
- boost::intrusive_ptr<Expression> _ifEmptyExpr;
// $setVariableFromSubPipeline sets the value of $$SEARCH_META only on the first call to
// doGetNext().
bool _firstCallForInput = true;
diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl
index ea5ff570083..8bd754c1e64 100644
--- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl
+++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl
@@ -48,9 +48,3 @@ structs:
description: A required aggregation pipeline that will return at most one document.
optional: false
type: pipeline
- ifEmpty:
- description: An expression to run to set the variable value if the pipeline returned no
- results.
- optional: true
- cpp_name: IfEmptyExpr
- type: IDLAnyTypeOwned
diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp
index 7d66ee4b304..e7cc8d71a27 100644
--- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp
+++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp
@@ -159,35 +159,5 @@ TEST_F(DocumentSourceSetVariableFromSubPipelineTest, testDoGetNext) {
nullptr) == 0);
}
-TEST_F(DocumentSourceSetVariableFromSubPipelineTest, ReturnExpressionWhenNoResults) {
- const auto inputDocs =
- std::vector{Document{{"a", 1}}, Document{{"b", 1}}, Document{{"c", 1}}, Document{{"d", 1}}};
- const auto expCtx = getExpCtx();
- const auto mockSourceForSetVarStage =
- DocumentSourceMock::createForTest(inputDocs[0], getExpCtx());
- auto ctxForSubPipeline = expCtx->copyForSubPipeline(expCtx->ns);
- const auto mockSourceForSubPipeline =
- DocumentSourceMock::createForTest(inputDocs, ctxForSubPipeline);
- auto setVariableFromSubPipeline = DocumentSourceSetVariableFromSubPipeline::create(
- expCtx,
- Pipeline::create(
- {DocumentSourceMatch::create(BSON("does not exist" << 1), ctxForSubPipeline)},
- ctxForSubPipeline),
- Variables::kSearchMetaId,
- ExpressionConstant::create(ctxForSubPipeline.get(), Value{Document{{"fallback", 1}}}));
-
- setVariableFromSubPipeline->addSubPipelineInitialSource(mockSourceForSubPipeline);
- setVariableFromSubPipeline->setSource(mockSourceForSetVarStage.get());
-
- auto comparator = DocumentComparator();
- auto results = comparator.makeUnorderedDocumentSet();
- auto next = setVariableFromSubPipeline->getNext();
- ASSERT_TRUE(next.isAdvanced());
-
- ASSERT_TRUE(Value::compare(expCtx->variables.getValue(Variables::kSearchMetaId),
- Value((BSON("fallback" << 1))),
- nullptr) == 0);
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 90497967241..009b4e9defb 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -642,7 +642,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distri
.serialize(SortPattern::SortKeySerialization::kForSortKeyMerging)
.toBson();
if (auto limit = getLimit()) {
- split.mergingStage = DocumentSourceLimit::create(pExpCtx, *limit);
+ split.mergingStages = {DocumentSourceLimit::create(pExpCtx, *limit)};
}
return split;
}
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index f27c519a5f3..013ef9b6a53 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -135,7 +135,7 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
ASSERT(sort()->distributedPlanLogic());
ASSERT(sort()->distributedPlanLogic()->shardsStage != nullptr);
- ASSERT(sort()->distributedPlanLogic()->mergingStage == nullptr);
+ ASSERT(sort()->distributedPlanLogic()->mergingStages.empty());
}
container.push_back(DocumentSourceLimit::create(expCtx, 10));
@@ -161,8 +161,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
ASSERT(sort()->distributedPlanLogic());
ASSERT(sort()->distributedPlanLogic()->shardsStage != nullptr);
- ASSERT(sort()->distributedPlanLogic()->mergingStage != nullptr);
- ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->distributedPlanLogic()->mergingStage.get()));
+ ASSERT_EQ(sort()->distributedPlanLogic()->mergingStages.size(), 1);
+ ASSERT(dynamic_cast<DocumentSourceLimit*>(
+ sort()->distributedPlanLogic()->mergingStages.begin()->get()));
}
TEST_F(DocumentSourceSortTest, DoesNotPushProjectBeforeSelf) {
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 99eef08c31c..1854c07a97a 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -4031,8 +4031,8 @@ public:
boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
DistributedPlanLogic logic;
- logic.mergingStage = nullptr;
logic.shardsStage = this;
+ logic.mergingStages = {};
logic.mergeSortPattern = BSON("a" << 1);
logic.needsSplit = false;
logic.canMovePast = canMovePastDuringSplit;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index cf110d53f43..9a7b4df2242 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -280,6 +280,22 @@ boost::optional<BSONObj> getOwnedOrNone(boost::optional<BSONObj> obj) {
return boost::none;
}
+void addSplitStages(const DocumentSource::DistributedPlanLogic& distributedPlanLogic,
+ Pipeline* mergePipe,
+ Pipeline* shardPipe) {
+ // This stage must be split, split it normally.
+ // Add in reverse order since we add each to the front and this would flip the order otherwise.
+ for (auto reverseIt = distributedPlanLogic.mergingStages.rbegin();
+ reverseIt != distributedPlanLogic.mergingStages.rend();
+ ++reverseIt) {
+ tassert(6448012,
+ "A stage cannot simultaneously be present on both sides of a pipeline split",
+ distributedPlanLogic.shardsStage != *reverseIt);
+ mergePipe->addInitialSource(*reverseIt);
+ }
+ addMaybeNullStageToBack(shardPipe, distributedPlanLogic.shardsStage);
+}
+
/**
* Helper for find split point that handles the split after a stage that must be on
* the merging half of the pipeline defers being added to the merging pipeline.
@@ -303,11 +319,7 @@ finishFindSplitPointAfterDeferral(
// If this stage also would like to split, split here. Don't defer multiple stages.
if (auto distributedPlanLogic = current->distributedPlanLogic()) {
- // A source may not simultaneously be present on both sides of the split.
- invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
- // This stage must be split, split it normally.
- addMaybeNullStageToBack(shardPipe.get(), std::move(distributedPlanLogic->shardsStage));
- addMaybeNullStageToFront(mergePipe, std::move(distributedPlanLogic->mergingStage));
+ addSplitStages(*distributedPlanLogic, mergePipe, shardPipe.get());
// The sort that was earlier in the pipeline takes precedence.
if (!mergeSort) {
@@ -355,22 +367,21 @@ std::pair<std::unique_ptr<Pipeline, PipelineDeleter>, boost::optional<BSONObj>>
tassert(6253721,
"Must have deferral function if deferring pipeline split",
distributedPlanLogic->canMovePast);
+ auto mergingStageList = distributedPlanLogic->mergingStages;
+ tassert(6448007,
+ "Only support deferring at most one stage for now.",
+ mergingStageList.size() <= 1);
// We know these are all currently null/none, as if we had deferred something and
// 'current' did not need split we would have returned above.
return finishFindSplitPointAfterDeferral(
mergePipe,
std::move(shardPipe),
- std::move(distributedPlanLogic->mergingStage),
+ mergingStageList.empty() ? nullptr : std::move(*mergingStageList.begin()),
getOwnedOrNone(distributedPlanLogic->mergeSortPattern),
distributedPlanLogic->canMovePast);
}
- // A source may not simultaneously be present on both sides of the split.
- invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
-
- addMaybeNullStageToBack(shardPipe.get(), std::move(distributedPlanLogic->shardsStage));
- addMaybeNullStageToFront(mergePipe, std::move(distributedPlanLogic->mergingStage));
-
+ addSplitStages(*distributedPlanLogic, mergePipe, shardPipe.get());
return {std::move(shardPipe), getOwnedOrNone(distributedPlanLogic->mergeSortPattern)};
}
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index f7f545de2a5..b18ea481c88 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -134,15 +134,15 @@ public:
return toRet;
}
- boost::optional<BSONObj> getCursorVars() {
+ boost::optional<BSONObj> getCursorVars() const {
return _cursorVars;
}
- auto getType() {
+ auto getType() const {
return _cursorType;
}
- long long getBatchNum() {
+ long long getBatchNum() const {
return _batchNum;
}