diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2022-05-11 19:55:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-11 20:20:17 +0000 |
commit | 768de6a405e5a41287d4368545e0a69a4d6bb676 (patch) | |
tree | e7b6b9df1f15ae3f1a5ec55c78c21013dcd348ed | |
parent | a2dfdcccedd7d0240e199f917ed6c9ff799fbd93 (diff) | |
download | mongo-768de6a405e5a41287d4368545e0a69a4d6bb676.tar.gz |
SERVER-65692 $searchMeta improvement
19 files changed, 81 insertions, 307 deletions
diff --git a/jstests/aggregation/variables/search_meta.js b/jstests/aggregation/variables/search_meta.js index 4cf0409c674..ba9fe1430ae 100644 --- a/jstests/aggregation/variables/search_meta.js +++ b/jstests/aggregation/variables/search_meta.js @@ -40,7 +40,14 @@ assert.commandFailedWithCode(db.runCommand({ }), ErrorCodes.FailedToParse); -assert.throwsWithCode( - () => db.non_existent_namespace.aggregate([{$searchMeta: {query: {nonsense: true}}}]), - [6448001, 31082, 40324]); // Error code may change on mongos or on community server. +const response = db.runCommand({ + aggregate: "non_existent_namespace", + pipeline: [{$searchMeta: {query: {nonsense: true}}}], + cursor: {} +}); +if (!response.ok) { + assert.commandFailedWithCode(response, [31082, 40324] /* community or mongos */); +} else { + assert.eq(response.cursor.firstBatch, []); +} })(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 01484843eba..6b14f635095 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -299,7 +299,6 @@ pipelineEnv.Library( 'document_source_lookup.cpp', 'document_source_match.cpp', 'document_source_merge.cpp', - 'document_source_mock_collection.cpp', 'document_source_operation_metrics.cpp', 'document_source_out.cpp', 'document_source_plan_cache_stats.cpp', diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index ff2aaf9f8ae..29b98369723 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -267,14 +267,29 @@ public: * collection. Describes how a pipeline should be split for sharded execution. */ struct DistributedPlanLogic { + DistributedPlanLogic() = default; + + /** + * Convenience constructor for the common case where there is at most one merging stage. Can + * pass nullptr for the merging stage which means "no merging required." + */ + DistributedPlanLogic(boost::intrusive_ptr<DocumentSource> shardsStageIn, + boost::intrusive_ptr<DocumentSource> mergeStage, + boost::optional<BSONObj> mergeSortPatternIn = boost::none) + : shardsStage(std::move(shardsStageIn)), + mergeSortPattern(std::move(mergeSortPatternIn)) { + if (mergeStage) + mergingStages.emplace_back(std::move(mergeStage)); + } + typedef std::function<bool(const DocumentSource&)> movePastFunctionType; // A stage which executes on each shard in parallel, or nullptr if nothing can be done in // parallel. For example, a partial $group before a subsequent global $group. boost::intrusive_ptr<DocumentSource> shardsStage = nullptr; - // A stage which executes after merging all the results together, or nullptr if nothing is - // necessary after merging. For example, a $limit stage. - boost::intrusive_ptr<DocumentSource> mergingStage = nullptr; + // A stage or stages which funciton to merge all the results together, or an empty list if + // nothing is necessary after merging. For example, a $limit stage. + std::list<boost::intrusive_ptr<DocumentSource>> mergingStages = {}; // If set, each document is expected to have sort key metadata which will be serialized in // the '$sortKey' field. 'mergeSortPattern' will then be used to describe which fields are diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index f7fb2af6f92..d3ed11f2031 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -602,10 +602,12 @@ protected: // case only one shard is in use. auto distributedPlanLogic = group()->distributedPlanLogic(); ASSERT(distributedPlanLogic); - ASSERT(distributedPlanLogic->mergingStage); - ASSERT_NOT_EQUALS(group(), distributedPlanLogic->mergingStage); + ASSERT_EQ(distributedPlanLogic->mergingStages.size(), 1) + << distributedPlanLogic->mergingStages.size(); + auto mergingStage = *distributedPlanLogic->mergingStages.begin(); + ASSERT_NOT_EQUALS(group(), mergingStage); ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->mergeSortPattern)); - return distributedPlanLogic->mergingStage; + return mergingStage; } void checkResultSet(const intrusive_ptr<DocumentSource>& sink) { // Load the results from the DocumentSourceGroup and sort them by _id. diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index b6013459bef..a565fb60c26 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -751,7 +751,7 @@ TEST_F(DocumentSourceLookUpTest, LookupDistributedPlanLogic) { expCtx); ASSERT(lookupStage->distributedPlanLogic()); ASSERT(lookupStage->distributedPlanLogic()->shardsStage == nullptr); - ASSERT(lookupStage->distributedPlanLogic()->mergingStage != nullptr); + ASSERT_EQ(lookupStage->distributedPlanLogic()->mergingStages.size(), 1); } TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) { diff --git a/src/mongo/db/pipeline/document_source_mock_collection.cpp b/src/mongo/db/pipeline/document_source_mock_collection.cpp deleted file mode 100644 index cce15db2788..00000000000 --- a/src/mongo/db/pipeline/document_source_mock_collection.cpp +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2022-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/pipeline/document_source_mock_collection.h" - -namespace mongo { - -REGISTER_INTERNAL_DOCUMENT_SOURCE(mockCollection, - LiteParsedDocumentSourceDefault::parse, - DocumentSourceMockCollection::createFromBson, - true); - -boost::intrusive_ptr<DocumentSource> DocumentSourceMockCollection::createFromBson( - BSONElement arrayElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - // Since we've already determined the stage name matched, the parser for $mockCollection is the - // same as the parser for $queue. Let's just re-use that parse function. - return make_intrusive<DocumentSourceMockCollection>( - DocumentSourceQueue::parseFromArray(arrayElem), expCtx); -} - -Value DocumentSourceMockCollection::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { - return DocumentSourceQueue::serializeWithName(explain, kStageName); -} - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_mock_collection.h b/src/mongo/db/pipeline/document_source_mock_collection.h deleted file mode 100644 index 2637c710e3b..00000000000 --- a/src/mongo/db/pipeline/document_source_mock_collection.h +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright (C) 2022-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_queue.h" - -namespace mongo { - -/** - * This is pretty much funcitonally equivalent to $documents or $queue, but it pretends to be a - * collection instead so it is usable on a normal namespace rather than a "collectionless" one. - * - * TODO SERVER-65534 this stage also works around the restriction that $documents must run on - * mongos. - */ -class DocumentSourceMockCollection : public DocumentSourceQueue { -public: - static constexpr StringData kStageName = "$mockCollection"_sd; - - DocumentSourceMockCollection(std::deque<GetNextResult> results, - const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceQueue(std::move(results), expCtx, kStageName) {} - - ~DocumentSourceMockCollection() override = default; - - const char* getSourceName() const override { - return kStageName.rawData(); - } - - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override; - - StageConstraints constraints(Pipeline::SplitState pipeState) const override { - auto queueConstraints = DocumentSourceQueue::constraints(pipeState); - // Technically this stage is independent in that it doesn't read any data. But the point of - // this stage is to replace a real collection and pretend that it is the collection. So in - // order to permit using this stage with a "real" (non-collectionless) namespace, we need to - // set this to false. - queueConstraints.isIndependentOfAnyCollection = false; - // TODO SERVER-65534 This is a hacky workaround. In order to get this stage to work in a - // pipeline followed by another stage which must run on a shard, we should be able to - // forward the entire pipeline to a shard. That kind of 'host type requirement' doesn't - // exist yet. - queueConstraints.hostRequirement = StageConstraints::HostTypeRequirement::kNone; - queueConstraints.requiredPosition = StageConstraints::PositionRequirement::kNone; - return queueConstraints; - } - - boost::optional<DistributedPlanLogic> distributedPlanLogic() override { - // TODO SERVER-65534 This is a hacky workaround. In order to get this stage to work in a - // pipeline with another stage which must run on a shard, we should be able to forward the - // entire pipeline to a shard. Until we know how to do that though, it is correct but not - // performant to do this instead: - - // {shards stage, merging stage, merge sort}. - return DistributedPlanLogic{ - // We will ignore all results, so add a limit to reduce perf impact. - DocumentSourceLimit::create(pExpCtx, 1), - // This needs to happen in the merging half so that we don't repeat the data N - // times where N is the number of shards with at least one chunk for this - // collection. - this, - boost::none}; - } - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp index 7f73c74061e..c724db5ae10 100644 --- a/src/mongo/db/pipeline/document_source_queue.cpp +++ b/src/mongo/db/pipeline/document_source_queue.cpp @@ -39,21 +39,16 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(queue, boost::intrusive_ptr<DocumentSource> DocumentSourceQueue::createFromBson( BSONElement arrayElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return DocumentSourceQueue::create(expCtx, parseFromArray(arrayElem)); -} - -std::deque<DocumentSource::GetNextResult> DocumentSourceQueue::parseFromArray( - BSONElement arrayElem) { uassert(5858201, "literal documents specification must be an array", arrayElem.type() == BSONType::Array); - std::deque<GetNextResult> queue; + auto queue = DocumentSourceQueue::create(expCtx); // arrayElem is an Array and can be iterated through by using .Obj() method for (auto elem : arrayElem.Obj()) { uassert(5858202, "literal documents specification must be an array of objects", elem.type() == BSONType::Object); - queue.emplace_back(Document{elem.Obj()}.getOwned()); + queue->emplace_back(Document{elem.Obj()}.getOwned()); } return queue; } @@ -61,14 +56,7 @@ std::deque<DocumentSource::GetNextResult> DocumentSourceQueue::parseFromArray( boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<StringData> aliasStageName) { - return DocumentSourceQueue::create(expCtx, {}, aliasStageName); -} - -boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::deque<GetNextResult> documents, - boost::optional<StringData> aliasStageName) { - return make_intrusive<DocumentSourceQueue>(std::move(documents), expCtx, aliasStageName); + return new DocumentSourceQueue({}, expCtx, aliasStageName); } DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results, @@ -93,16 +81,11 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() { } Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - return serializeWithName(explain, kStageName); -} - -Value DocumentSourceQueue::serializeWithName(boost::optional<ExplainOptions::Verbosity> explain, - StringData stageName) const { ValueArrayStream vals; for (auto elem : _queue) { vals << elem.getDocument().getOwned(); } - return Value(DOC(stageName << vals.done())); + return Value(DOC(kStageName << vals.done())); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index b2182b1ff9d..31dc128f6cb 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -48,11 +48,6 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<StringData> aliasStageName = boost::none); - static boost::intrusive_ptr<DocumentSourceQueue> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::deque<GetNextResult> docs, - boost::optional<StringData> aliasStageName = boost::none); - DocumentSourceQueue(std::deque<GetNextResult> results, const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<StringData> aliasStageName = boost::none); @@ -90,7 +85,7 @@ public: return DepsTracker::SEE_NEXT; } - virtual boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { return boost::none; } @@ -111,11 +106,6 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); protected: - static std::deque<DocumentSource::GetNextResult> parseFromArray(BSONElement arrayElem); - - Value serializeWithName(boost::optional<ExplainOptions::Verbosity> explain, - StringData stageName) const; - GetNextResult doGetNext() override; // Return documents from front of queue. std::deque<GetNextResult> _queue; diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 1b5cb79ec7f..5ec204d1fa9 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -128,7 +128,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::dist DistributedPlanLogic logic; logic.shardsStage = this; if (_size > 0) { - logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size); + logic.mergingStages = {DocumentSourceLimit::create(pExpCtx, _size)}; } // Here we don't use 'randSortSpec' because it uses a metadata sort which the merging logic does diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp index f87810b1f06..a3c3cfaf8ef 100644 --- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp +++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.cpp @@ -61,10 +61,6 @@ Value DocumentSourceSetVariableFromSubPipeline::serialize( tassert(625298, "SubPipeline cannot be null during serialization", _subPipeline); spec.setSetVariable(var); spec.setPipeline(_subPipeline->serializeToBson(explain)); - if (_ifEmptyExpr) { - auto ifEmptyBson = BSON("ifEmpty" << _ifEmptyExpr->serialize(bool(explain))); - spec.setIfEmptyExpr(IDLAnyTypeOwned::parseFromBSON(ifEmptyBson.firstElement())); - } return Value(DOC(getSourceName() << spec.toBSON())); } @@ -95,35 +91,22 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSetVariableFromSubPipeline::c std::unique_ptr<Pipeline, PipelineDeleter> pipeline = Pipeline::parse(spec.getPipeline(), expCtx->copyForSubPipeline(expCtx->ns)); - - boost::intrusive_ptr<Expression> ifEmptyExpr; - if (spec.getIfEmptyExpr()) { - // The semantics of 'ifEmpty' are to execute in the sub-pipeline's context. So for example, - // it should see the sub-pipelines definition of "$$SEARCH_META", if there was one. So it's - // important to use that pipeline's ExpressionContext and variables here during evaluation. - ifEmptyExpr = Expression::parseOperand(pipeline->getContext().get(), - spec.getIfEmptyExpr()->getElement(), - pipeline->getContext()->variablesParseState); - } - return DocumentSourceSetVariableFromSubPipeline::create( - expCtx, std::move(pipeline), Variables::kSearchMetaId, ifEmptyExpr); + expCtx, std::move(pipeline), Variables::kSearchMetaId); } intrusive_ptr<DocumentSourceSetVariableFromSubPipeline> DocumentSourceSetVariableFromSubPipeline::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> subpipeline, - Variables::Id varID, - boost::intrusive_ptr<Expression> ifEmptyExpression) { + Variables::Id varID) { uassert(625290, str::stream() << "SetVariableFromSubPipeline only allows setting $$SEARCH_META variable, '$$" << Variables::getBuiltinVariableName(varID) << "' is not allowed.", !Variables::isUserDefinedVariable(varID) && varID == Variables::kSearchMetaId); return intrusive_ptr<DocumentSourceSetVariableFromSubPipeline>( - new DocumentSourceSetVariableFromSubPipeline( - expCtx, std::move(subpipeline), varID, std::move(ifEmptyExpression))); + new DocumentSourceSetVariableFromSubPipeline(expCtx, std::move(subpipeline), varID)); }; DocumentSource::GetNextResult DocumentSourceSetVariableFromSubPipeline::doGetNext() { @@ -132,20 +115,9 @@ DocumentSource::GetNextResult DocumentSourceSetVariableFromSubPipeline::doGetNex "Expected to have already attached a cursor source to the pipeline", !_subPipeline->peekFront()->constraints().requiresInputDocSource); auto nextSubPipelineInput = _subPipeline->getNext(); - if (!nextSubPipelineInput) { - uassert(625296, - "No document returned from $SetVariableFromSubPipeline subpipeline, and no " - "default expression", - _ifEmptyExpr); - auto exprResult = - _ifEmptyExpr->evaluate(Document{}, &_subPipeline->getContext()->variables); - uassert(6448001, - str::stream() << "ifEmpty expression did not evaluate to an object: " - << exprResult.toString() << " (_ifEmptyExpr: " - << _ifEmptyExpr->serialize(true).toString() << ")", - exprResult.isObject()); - nextSubPipelineInput = exprResult.getDocument(); - } + uassert(625296, + "No document returned from $SetVariableFromSubPipeline subpipeline", + nextSubPipelineInput); uassert(625297, "Multiple documents returned from $SetVariableFromSubPipeline subpipeline when " "only one expected", diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h index ab0d4f5dd20..0fccb527fad 100644 --- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h +++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.h @@ -31,8 +31,6 @@ #include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_set_variable_from_subpipeline_gen.h" - namespace mongo { @@ -46,8 +44,7 @@ public: static boost::intrusive_ptr<DocumentSourceSetVariableFromSubPipeline> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> subpipeline, - Variables::Id varID, - boost::intrusive_ptr<Expression> defaultExpression = nullptr); + Variables::Id varID); ~DocumentSourceSetVariableFromSubPipeline() = default; @@ -101,26 +98,10 @@ public: protected: DocumentSourceSetVariableFromSubPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> subpipeline, - Variables::Id varID, - boost::intrusive_ptr<Expression> ifEmptyExpr) + Variables::Id varID) : DocumentSource(kStageName, expCtx), _subPipeline(std::move(subpipeline)), - _variableID(varID), - _ifEmptyExpr(std::move(ifEmptyExpr)) { - if (_ifEmptyExpr) { - // The 'ifEmpty' expression runs if there were no results, so naturally will have no - // document to use for expressions like "$a" to make sense. Here we assert that whatever - // expression is given doesn't depend on any specific fields. It should be something - // like a $$SEARCH_META variable or a constant, not something like {$ifNull: ["$a", 0]}. - DepsTracker deps; - _ifEmptyExpr->addDependencies(&deps); - uassert(6448000, - str::stream() << SetVariableFromSubPipelineSpec::kIfEmptyExprFieldName - << " must not reference any field paths: " - << _ifEmptyExpr->serialize(true /* explain */).toString(), - !deps.needWholeDocument && deps.fields.empty()); - } - } + _variableID(varID) {} private: @@ -128,8 +109,6 @@ private: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; std::unique_ptr<Pipeline, PipelineDeleter> _subPipeline; Variables::Id _variableID; - // An expression to use if the pipeline does not return any results. - boost::intrusive_ptr<Expression> _ifEmptyExpr; // $setVariableFromSubPipeline sets the value of $$SEARCH_META only on the first call to // doGetNext(). bool _firstCallForInput = true; diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl index ea5ff570083..8bd754c1e64 100644 --- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl +++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline.idl @@ -48,9 +48,3 @@ structs: description: A required aggregation pipeline that will return at most one document. optional: false type: pipeline - ifEmpty: - description: An expression to run to set the variable value if the pipeline returned no - results. - optional: true - cpp_name: IfEmptyExpr - type: IDLAnyTypeOwned diff --git a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp index 7d66ee4b304..e7cc8d71a27 100644 --- a/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp +++ b/src/mongo/db/pipeline/document_source_set_variable_from_subpipeline_test.cpp @@ -159,35 +159,5 @@ TEST_F(DocumentSourceSetVariableFromSubPipelineTest, testDoGetNext) { nullptr) == 0); } -TEST_F(DocumentSourceSetVariableFromSubPipelineTest, ReturnExpressionWhenNoResults) { - const auto inputDocs = - std::vector{Document{{"a", 1}}, Document{{"b", 1}}, Document{{"c", 1}}, Document{{"d", 1}}}; - const auto expCtx = getExpCtx(); - const auto mockSourceForSetVarStage = - DocumentSourceMock::createForTest(inputDocs[0], getExpCtx()); - auto ctxForSubPipeline = expCtx->copyForSubPipeline(expCtx->ns); - const auto mockSourceForSubPipeline = - DocumentSourceMock::createForTest(inputDocs, ctxForSubPipeline); - auto setVariableFromSubPipeline = DocumentSourceSetVariableFromSubPipeline::create( - expCtx, - Pipeline::create( - {DocumentSourceMatch::create(BSON("does not exist" << 1), ctxForSubPipeline)}, - ctxForSubPipeline), - Variables::kSearchMetaId, - ExpressionConstant::create(ctxForSubPipeline.get(), Value{Document{{"fallback", 1}}})); - - setVariableFromSubPipeline->addSubPipelineInitialSource(mockSourceForSubPipeline); - setVariableFromSubPipeline->setSource(mockSourceForSetVarStage.get()); - - auto comparator = DocumentComparator(); - auto results = comparator.makeUnorderedDocumentSet(); - auto next = setVariableFromSubPipeline->getNext(); - ASSERT_TRUE(next.isAdvanced()); - - ASSERT_TRUE(Value::compare(expCtx->variables.getValue(Variables::kSearchMetaId), - Value((BSON("fallback" << 1))), - nullptr) == 0); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 90497967241..009b4e9defb 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -642,7 +642,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distri .serialize(SortPattern::SortKeySerialization::kForSortKeyMerging) .toBson(); if (auto limit = getLimit()) { - split.mergingStage = DocumentSourceLimit::create(pExpCtx, *limit); + split.mergingStages = {DocumentSourceLimit::create(pExpCtx, *limit)}; } return split; } diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index f27c519a5f3..013ef9b6a53 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -135,7 +135,7 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { ASSERT(sort()->distributedPlanLogic()); ASSERT(sort()->distributedPlanLogic()->shardsStage != nullptr); - ASSERT(sort()->distributedPlanLogic()->mergingStage == nullptr); + ASSERT(sort()->distributedPlanLogic()->mergingStages.empty()); } container.push_back(DocumentSourceLimit::create(expCtx, 10)); @@ -161,8 +161,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { ASSERT(sort()->distributedPlanLogic()); ASSERT(sort()->distributedPlanLogic()->shardsStage != nullptr); - ASSERT(sort()->distributedPlanLogic()->mergingStage != nullptr); - ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->distributedPlanLogic()->mergingStage.get())); + ASSERT_EQ(sort()->distributedPlanLogic()->mergingStages.size(), 1); + ASSERT(dynamic_cast<DocumentSourceLimit*>( + sort()->distributedPlanLogic()->mergingStages.begin()->get())); } TEST_F(DocumentSourceSortTest, DoesNotPushProjectBeforeSelf) { diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 99eef08c31c..1854c07a97a 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -4031,8 +4031,8 @@ public: boost::optional<DistributedPlanLogic> distributedPlanLogic() override { DistributedPlanLogic logic; - logic.mergingStage = nullptr; logic.shardsStage = this; + logic.mergingStages = {}; logic.mergeSortPattern = BSON("a" << 1); logic.needsSplit = false; logic.canMovePast = canMovePastDuringSplit; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index cf110d53f43..9a7b4df2242 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -280,6 +280,22 @@ boost::optional<BSONObj> getOwnedOrNone(boost::optional<BSONObj> obj) { return boost::none; } +void addSplitStages(const DocumentSource::DistributedPlanLogic& distributedPlanLogic, + Pipeline* mergePipe, + Pipeline* shardPipe) { + // This stage must be split, split it normally. + // Add in reverse order since we add each to the front and this would flip the order otherwise. + for (auto reverseIt = distributedPlanLogic.mergingStages.rbegin(); + reverseIt != distributedPlanLogic.mergingStages.rend(); + ++reverseIt) { + tassert(6448012, + "A stage cannot simultaneously be present on both sides of a pipeline split", + distributedPlanLogic.shardsStage != *reverseIt); + mergePipe->addInitialSource(*reverseIt); + } + addMaybeNullStageToBack(shardPipe, distributedPlanLogic.shardsStage); +} + /** * Helper for find split point that handles the split after a stage that must be on * the merging half of the pipeline defers being added to the merging pipeline. @@ -303,11 +319,7 @@ finishFindSplitPointAfterDeferral( // If this stage also would like to split, split here. Don't defer multiple stages. if (auto distributedPlanLogic = current->distributedPlanLogic()) { - // A source may not simultaneously be present on both sides of the split. - invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); - // This stage must be split, split it normally. - addMaybeNullStageToBack(shardPipe.get(), std::move(distributedPlanLogic->shardsStage)); - addMaybeNullStageToFront(mergePipe, std::move(distributedPlanLogic->mergingStage)); + addSplitStages(*distributedPlanLogic, mergePipe, shardPipe.get()); // The sort that was earlier in the pipeline takes precedence. if (!mergeSort) { @@ -355,22 +367,21 @@ std::pair<std::unique_ptr<Pipeline, PipelineDeleter>, boost::optional<BSONObj>> tassert(6253721, "Must have deferral function if deferring pipeline split", distributedPlanLogic->canMovePast); + auto mergingStageList = distributedPlanLogic->mergingStages; + tassert(6448007, + "Only support deferring at most one stage for now.", + mergingStageList.size() <= 1); // We know these are all currently null/none, as if we had deferred something and // 'current' did not need split we would have returned above. return finishFindSplitPointAfterDeferral( mergePipe, std::move(shardPipe), - std::move(distributedPlanLogic->mergingStage), + mergingStageList.empty() ? nullptr : std::move(*mergingStageList.begin()), getOwnedOrNone(distributedPlanLogic->mergeSortPattern), distributedPlanLogic->canMovePast); } - // A source may not simultaneously be present on both sides of the split. - invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); - - addMaybeNullStageToBack(shardPipe.get(), std::move(distributedPlanLogic->shardsStage)); - addMaybeNullStageToFront(mergePipe, std::move(distributedPlanLogic->mergingStage)); - + addSplitStages(*distributedPlanLogic, mergePipe, shardPipe.get()); return {std::move(shardPipe), getOwnedOrNone(distributedPlanLogic->mergeSortPattern)}; } diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index f7f545de2a5..b18ea481c88 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -134,15 +134,15 @@ public: return toRet; } - boost::optional<BSONObj> getCursorVars() { + boost::optional<BSONObj> getCursorVars() const { return _cursorVars; } - auto getType() { + auto getType() const { return _cursorType; } - long long getBatchNum() { + long long getBatchNum() const { return _batchNum; } |