diff options
author | James Wahlin <james.wahlin@10gen.com> | 2017-05-04 16:42:50 -0400 |
---|---|---|
committer | James Wahlin <james.wahlin@10gen.com> | 2017-05-19 16:44:50 -0400 |
commit | 7cc042a4f8d21354b36d44f6b3642d2795ecb9ee (patch) | |
tree | 62dfc213df5a16ad006fe660fea8aecc7dea8e32 | |
parent | 2aaa0eafa5f1c6e1c43c1f42fcf7975722c3fbfe (diff) | |
download | mongo-7cc042a4f8d21354b36d44f6b3642d2795ecb9ee.tar.gz |
SERVER-29072 Add support for $lookup into a sub-pipeline
-rw-r--r-- | jstests/aggregation/bugs/server19095.js | 2 | ||||
-rw-r--r-- | jstests/aggregation/sources/lookup/lookup_non_correlated.js | 64 | ||||
-rw-r--r-- | jstests/core/views/views_validation.js | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 208 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 60 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_test.cpp | 100 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 53 |
9 files changed, 447 insertions, 77 deletions
diff --git a/jstests/aggregation/bugs/server19095.js b/jstests/aggregation/bugs/server19095.js index e55539cedc7..5577faf7b4a 100644 --- a/jstests/aggregation/bugs/server19095.js +++ b/jstests/aggregation/bugs/server19095.js @@ -377,7 +377,7 @@ load("jstests/aggregation/extras/utils.js"); assertErrorCode(coll, [{$lookup: {localField: "a", from: "from", as: "same"}}], 4572); assertErrorCode(coll, [{$lookup: {localField: "a", foreignField: "b", as: "same"}}], 40320); assertErrorCode( - coll, [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], 4572); + coll, [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], 40449); // All four field's values must be strings. assertErrorCode( diff --git a/jstests/aggregation/sources/lookup/lookup_non_correlated.js b/jstests/aggregation/sources/lookup/lookup_non_correlated.js new file mode 100644 index 00000000000..523eb37f8d4 --- /dev/null +++ b/jstests/aggregation/sources/lookup/lookup_non_correlated.js @@ -0,0 +1,64 @@ +// Cannot implicitly shard accessed collections as $lookup does not support sharded target +// collection. +// @tags: [assumes_unsharded_collection] + +/** + * Confirms that $lookup with a non-correlated foreign pipeline returns expected results. + */ +(function() { + "use strict"; + + const testDB = db.getSiblingDB("lookup_non_correlated"); + const localName = "local"; + const localColl = testDB.getCollection(localName); + localColl.drop(); + const foreignName = "foreign"; + const foreignColl = testDB.getCollection(foreignName); + foreignColl.drop(); + + assert.writeOK(localColl.insert({_id: "A"})); + assert.writeOK(localColl.insert({_id: "B"})); + assert.writeOK(localColl.insert({_id: "C"})); + + assert.writeOK(foreignColl.insert({_id: 1})); + assert.writeOK(foreignColl.insert({_id: 2})); + assert.writeOK(foreignColl.insert({_id: 3})); + + // Basic non-correlated lookup returns expected results. + let cursor = localColl.aggregate([ + {$match: {_id: {$in: ["B", "C"]}}}, + {$sort: {_id: 1}}, + {$lookup: {from: foreignName, as: "foreignDocs", pipeline: [{$match: {_id: {"$gte": 2}}}]}}, + ]); + + assert(cursor.hasNext()); + assert.docEq({_id: "B", foreignDocs: [{_id: 2}, {_id: 3}]}, cursor.next()); + assert(cursor.hasNext()); + assert.docEq({_id: "C", foreignDocs: [{_id: 2}, {_id: 3}]}, cursor.next()); + assert(!cursor.hasNext()); + + // Non-correlated lookup followed by unwind on 'as' returns expected results. + cursor = localColl.aggregate([ + {$match: {_id: "A"}}, + {$lookup: {from: foreignName, as: "foreignDocs", pipeline: [{$match: {_id: {"$gte": 2}}}]}}, + {$unwind: "$foreignDocs"} + ]); + + assert(cursor.hasNext()); + assert.docEq({_id: "A", foreignDocs: {_id: 2}}, cursor.next()); + assert(cursor.hasNext()); + assert.docEq({_id: "A", foreignDocs: {_id: 3}}, cursor.next()); + assert(!cursor.hasNext()); + + // Non-correlated lookup followed by unwind and filter on 'as' returns expected results. + cursor = localColl.aggregate([ + {$match: {_id: "A"}}, + {$lookup: {from: foreignName, as: "foreignDocs", pipeline: [{$match: {_id: {"$gte": 2}}}]}}, + {$unwind: "$foreignDocs"}, + {$match: {"foreignDocs._id": 2}} + ]); + + assert(cursor.hasNext()); + assert.docEq({_id: "A", foreignDocs: {_id: 2}}, cursor.next()); + assert(!cursor.hasNext()); +})(); diff --git a/jstests/core/views/views_validation.js b/jstests/core/views/views_validation.js index 84c7f1d3510..d713333c901 100644 --- a/jstests/core/views/views_validation.js +++ b/jstests/core/views/views_validation.js @@ -123,6 +123,6 @@ "collMod modified view to have invalid pipeline"); clear(); - // Check that invalid pipelines are disallowed. - makeView("a", "b", [{"$lookup": {from: "a"}}], 4572); // 4572 is for missing $lookup fields + // Check that invalid pipelines are disallowed. The following $lookup is missing the 'as' field. + makeView("a", "b", [{"$lookup": {from: "a", localField: "b", foreignField: "c"}}], 40449); }()); diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index f915ab47431..d2cdfddd0c5 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -63,16 +63,13 @@ constexpr long long AggregationRequest::kDefaultBatchSize; AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline) : _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {} -StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( - NamespaceString nss, - const BSONObj& cmdObj, - boost::optional<ExplainOptions::Verbosity> explainVerbosity) { - // Parse required parameters. - auto pipelineElem = cmdObj[kPipelineName]; +StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON( + BSONElement pipelineElem) { + std::vector<BSONObj> pipeline; if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) { return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"}; } - std::vector<BSONObj> pipeline; + for (auto elem : pipelineElem.Obj()) { if (elem.type() != BSONType::Object) { return {ErrorCodes::TypeMismatch, @@ -81,7 +78,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( pipeline.push_back(elem.embeddedObject().getOwned()); } - AggregationRequest request(std::move(nss), std::move(pipeline)); + return std::move(pipeline); +} + +StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( + NamespaceString nss, + const BSONObj& cmdObj, + boost::optional<ExplainOptions::Verbosity> explainVerbosity) { + // Parse required parameters. + auto pipelineElem = cmdObj[kPipelineName]; + auto pipeline = AggregationRequest::parsePipelineFromBSON(pipelineElem); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + AggregationRequest request(std::move(nss), std::move(pipeline.getValue())); const std::initializer_list<StringData> optionsParsedElseWhere = {kPipelineName, kCommandName}; diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 14b4dad0e59..4feaea931a8 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -61,6 +61,12 @@ public: static constexpr long long kDefaultBatchSize = 101; /** + * Parse an aggregation pipeline definition from 'pipelineElem'. Returns a non-OK status if + * pipeline is not an array or if any of the array elements are not objects. + */ + static StatusWith<std::vector<BSONObj>> parsePipelineFromBSON(BSONElement pipelineElem); + + /** * Create a new instance of AggregationRequest by parsing the raw command object. Returns a * non-OK status if a required field was missing, if there was an unrecognized field name or if * there was a bad value for one of the fields. diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 4c68db3f1da..70e5dffaf2e 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -44,27 +44,63 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; +namespace { +std::string pipelineToString(const vector<BSONObj>& pipeline) { + StringBuilder sb; + sb << "["; + + auto first = true; + for (auto& stageSpec : pipeline) { + if (!first) { + sb << ", "; + } else { + first = false; + } + sb << stageSpec; + } + sb << "]"; + return sb.str(); +} +} // namespace + DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, - std::string localField, - std::string foreignField, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx), - _fromNs(std::move(fromNs)), - _as(std::move(as)), - _localField(std::move(localField)), - _foreignField(foreignField), - _foreignFieldFieldName(std::move(foreignField)) { + : DocumentSourceNeedsMongod(pExpCtx), _fromNs(std::move(fromNs)), _as(std::move(as)) { const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs); _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns); _fromPipeline = resolvedNamespace.pipeline; + _resolvedNs = std::move(resolvedNamespace.ns); +} +DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, + std::string as, + std::string localField, + std::string foreignField, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSourceLookUp(fromNs, as, pExpCtx) { + _localField = std::move(localField); + _foreignField = std::move(foreignField); // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage // we'll eventually construct from the input document. _fromPipeline.reserve(_fromPipeline.size() + 1); _fromPipeline.push_back(BSONObj()); } +DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, + std::string as, + std::vector<BSONObj> pipeline, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSourceLookUp(fromNs, as, pExpCtx) { + // '_fromPipeline' will first be initialized by the constructor delegated to within this + // constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs' + // represents a view. We append the user 'pipeline' to the end of '_fromPipeline' to ensure any + // view prefix is not overwritten. + _fromPipeline.insert(_fromPipeline.end(), pipeline.begin(), pipeline.end()); + + _userPipeline = std::move(pipeline); +} + std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> DocumentSourceLookUp::liteParse( const AggregationRequest& request, const BSONElement& spec) { uassert(40319, @@ -121,15 +157,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const vector<Value>& DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { pExpCtx->checkForInterrupt(); - if (!_additionalFilter && _matchSrc) { - // We have internalized a $match, but have not yet computed the descended $match that should - // be applied to our queries. - _additionalFilter = DocumentSourceMatch::descendMatchOnPath( - _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx) - ->getQuery(); - } - - if (_handlingUnwind) { + if (_unwindSrc) { return unwindResult(); } @@ -140,22 +168,27 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { auto inputDoc = nextInput.releaseDocument(); // If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind, - // '_handlingUnwind' would be set to true, and we would not have made it here. + // '_unwindSrc' would be non-null, and we would not have made it here. invariant(!_matchSrc); - auto matchStage = - makeMatchStageFromInput(inputDoc, _localField, _foreignFieldFieldName, BSONObj()); - // We've already allocated space for the trailing $match stage in '_fromPipeline'. - _fromPipeline.back() = matchStage; + if (!wasConstructedWithPipelineSyntax()) { + auto matchStage = + makeMatchStageFromInput(inputDoc, *_localField, _foreignField->fullPath(), BSONObj()); + // We've already allocated space for the trailing $match stage in '_fromPipeline'. + _fromPipeline.back() = matchStage; + } + auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); std::vector<Value> results; int objsize = 0; + while (auto result = pipeline->getNext()) { objsize += result->getApproximateSize(); uassert(4568, - str::stream() << "Total size of documents in " << _fromNs.coll() << " matching " - << matchStage + str::stream() << "Total size of documents in " << _fromNs.coll() + << " matching pipeline " + << getUserPipelineDefinition() << " exceeds maximum document size", objsize <= BSONObjMaxInternalSize); results.emplace_back(std::move(*result)); @@ -185,9 +218,8 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( // If we are not already handling an $unwind stage internally, we can combine with the // following $unwind stage. - if (nextUnwind && !_handlingUnwind && nextUnwind->getUnwindPath() == _as.fullPath()) { + if (nextUnwind && !_unwindSrc && nextUnwind->getUnwindPath() == _as.fullPath()) { _unwindSrc = std::move(nextUnwind); - _handlingUnwind = true; container->erase(std::next(itr)); return itr; } @@ -199,7 +231,7 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( return std::next(itr); } - if (!_handlingUnwind || _unwindSrc->indexPath() || _unwindSrc->preserveNullAndEmptyArrays()) { + if (!_unwindSrc || _unwindSrc->indexPath() || _unwindSrc->preserveNullAndEmptyArrays()) { // We must be unwinding our result to internalize a $match. For example, consider the // following pipeline: // @@ -266,9 +298,8 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( } // We can internalize the $match. - if (!_handlingMatch) { + if (!_matchSrc) { _matchSrc = nextMatch; - _handlingMatch = true; } else { // We have already absorbed a $match. We need to join it with 'dependent'. _matchSrc->joinMatchWith(nextMatch); @@ -277,9 +308,30 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( // Remove the original $match. There may be further optimization between this $lookup and the // new neighbor, so we return an iterator pointing to ourself. container->erase(std::next(itr)); + + // We have internalized a $match, but have not yet computed the descended $match that should + // be applied to our queries. + _additionalFilter = DocumentSourceMatch::descendMatchOnPath( + _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx) + ->getQuery() + .getOwned(); + + if (wasConstructedWithPipelineSyntax()) { + auto matchObj = BSON("$match" << *_additionalFilter); + _fromPipeline.push_back(matchObj); + } + return itr; } +std::string DocumentSourceLookUp::getUserPipelineDefinition() { + if (wasConstructedWithPipelineSyntax()) { + return pipelineToString(_userPipeline); + } + + return _fromPipeline.back().toString(); +} + void DocumentSourceLookUp::doDispose() { if (_pipeline) { _pipeline->dispose(pExpCtx->opCtx); @@ -376,11 +428,13 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() { _input = nextInput.releaseDocument(); - BSONObj filter = _additionalFilter.value_or(BSONObj()); - auto matchStage = - makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter); - // We've already allocated space for the trailing $match stage in '_fromPipeline'. - _fromPipeline.back() = matchStage; + if (!wasConstructedWithPipelineSyntax()) { + BSONObj filter = _additionalFilter.value_or(BSONObj()); + auto matchStage = + makeMatchStageFromInput(*_input, *_localField, _foreignField->fullPath(), filter); + // We've already allocated space for the trailing $match stage in '_fromPipeline'. + _fromPipeline.back() = matchStage; + } if (_pipeline) { _pipeline->dispose(pExpCtx->opCtx); @@ -427,13 +481,23 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() { void DocumentSourceLookUp::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { - MutableDocument output(DOC( - getSourceName() << DOC("from" << _fromNs.coll() << "as" << _as.fullPath() << "localField" - << _localField.fullPath() - << "foreignField" - << _foreignField.fullPath()))); + Document doc; + if (wasConstructedWithPipelineSyntax()) { + doc = Document{{getSourceName(), + Document{{"from", _resolvedNs.coll()}, + {"as", _as.fullPath()}, + {"pipeline", _fromPipeline}}}}; + } else { + doc = Document{{getSourceName(), + {Document{{"from", _fromNs.coll()}, + {"as", _as.fullPath()}, + {"localField", _localField->fullPath()}, + {"foreignField", _foreignField->fullPath()}}}}}; + } + + MutableDocument output(doc); if (explain) { - if (_handlingUnwind) { + if (_unwindSrc) { const boost::optional<FieldPath> indexPath = _unwindSrc->indexPath(); output[getSourceName()]["unwinding"] = Value(DOC("preserveNullAndEmptyArrays" @@ -442,24 +506,24 @@ void DocumentSourceLookUp::serializeToArray( << (indexPath ? Value(indexPath->fullPath()) : Value()))); } - if (_matchSrc) { + // Only add _matchSrc for explain when $lookup was constructed with localField/foreignField + // syntax. For pipeline sytax, _matchSrc will be included as part of the pipeline + // definition. + if (!wasConstructedWithPipelineSyntax() && _additionalFilter) { // Our output does not have to be parseable, so include a "matching" field with the // descended match expression. - output[getSourceName()]["matching"] = - Value(DocumentSourceMatch::descendMatchOnPath( - _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx) - ->getQuery()); + output[getSourceName()]["matching"] = Value(*_additionalFilter); } array.push_back(Value(output.freeze())); } else { array.push_back(Value(output.freeze())); - if (_handlingUnwind) { + if (_unwindSrc) { _unwindSrc->serializeToArray(array); } - if (_matchSrc) { + if (!wasConstructedWithPipelineSyntax() && _matchSrc) { // '_matchSrc' tracks the originally specified $match. We descend upon the $match in the // first call to getNext(), at which point we are confident that we no longer need to // serialize the $lookup again. @@ -469,7 +533,10 @@ void DocumentSourceLookUp::serializeToArray( } DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const { - deps->fields.insert(_localField.fullPath()); + // As current pipeline syntax only supports non-correlated join, it precludes dependencies. + if (!wasConstructedWithPipelineSyntax()) { + deps->fields.insert(_localField->fullPath()); + } return SEE_NEXT; } @@ -503,13 +570,28 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( std::string as; std::string localField; std::string foreignField; + std::vector<BSONObj> pipeline; + bool hasPipeline = false; for (auto&& argument : elem.Obj()) { + const auto argName = argument.fieldNameStringData(); + + if (argName == "pipeline") { + auto result = AggregationRequest::parsePipelineFromBSON(argument); + if (!result.isOK()) { + uasserted(40447, + str::stream() << "invalid $lookup pipeline definition: " + << result.getStatus().toString()); + } + pipeline = std::move(result.getValue()); + hasPipeline = true; + continue; + } + uassert(4570, - str::stream() << "arguments to $lookup must be strings, " << argument << " is type " + str::stream() << "$lookup '" << argument << "' must be a string, is type " << argument.type(), argument.type() == String); - const auto argName = argument.fieldNameStringData(); if (argName == "from") { fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String()); @@ -525,11 +607,27 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( } } - uassert(4572, - "need to specify fields from, as, localField, and foreignField for a $lookup", - !fromNs.ns().empty() && !as.empty() && !localField.empty() && !foreignField.empty()); + uassert(40451, "must specify 'from' field for a $lookup", !fromNs.ns().empty()); + uassert(40449, "must specify 'as' field for a $lookup", !as.empty()); - return new DocumentSourceLookUp( - std::move(fromNs), std::move(as), std::move(localField), std::move(foreignField), pExpCtx); + if (hasPipeline) { + uassert(40450, + "$lookup with 'pipeline' may not specify 'localField' or 'foreignField'", + localField.empty() || foreignField.empty()); + + return new DocumentSourceLookUp( + std::move(fromNs), std::move(as), std::move(pipeline), pExpCtx); + } else { + uassert(4572, + "$lookup requires either 'pipeline' or both 'localField' and 'foreignField' to be " + "specified", + !localField.empty() && !foreignField.empty()); + + return new DocumentSourceLookUp(std::move(fromNs), + std::move(as), + std::move(localField), + std::move(foreignField), + pExpCtx); + } } } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index e696e8bb4e5..50b02501189 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -103,9 +103,16 @@ public: * Helper to absorb an $unwind stage. Only used for testing this special behavior. */ void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) { - invariant(!_handlingUnwind); + invariant(!_unwindSrc); _unwindSrc = unwind; - _handlingUnwind = true; + } + + /** + * Returns true if DocumentSourceLookup was constructed with pipeline syntax (as opposed to + * localField/foreignField syntax). + */ + bool wasConstructedWithPipelineSyntax() const { + return !static_cast<bool>(_localField); } protected: @@ -119,41 +126,72 @@ protected: Pipeline::SourceContainer* container) final; private: + /** + * Target constructor. Handles common-field initialization for the syntax-specific delegating + * constructors. + */ + DocumentSourceLookUp(NamespaceString fromNs, + std::string as, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * Constructor used for a $lookup stage specified using the {from: ..., localField: ..., + * foreignField: ..., as: ...} syntax. + */ DocumentSourceLookUp(NamespaceString fromNs, std::string as, std::string localField, std::string foreignField, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + /** + * Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as: + * ...} syntax. + */ + DocumentSourceLookUp(NamespaceString fromNs, + std::string as, + std::vector<BSONObj> pipeline, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * Should not be called; use serializeToArray instead. + */ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { - // Should not be called; use serializeToArray instead. MONGO_UNREACHABLE; } GetNextResult unwindResult(); + /** + * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that + * is executed in that it will not include optimizations or resolved views. + */ + std::string getUserPipelineDefinition(); + NamespaceString _fromNs; + NamespaceString _resolvedNs; FieldPath _as; - FieldPath _localField; - FieldPath _foreignField; - std::string _foreignFieldFieldName; boost::optional<BSONObj> _additionalFilter; + // For use when $lookup is specified with localField/foreignField syntax. + boost::optional<FieldPath> _localField; + boost::optional<FieldPath> _foreignField; + // The ExpressionContext used when performing aggregation pipelines against the '_fromNs' // namespace. boost::intrusive_ptr<ExpressionContext> _fromExpCtx; // The aggregation pipeline to perform against the '_fromNs' namespace. std::vector<BSONObj> _fromPipeline; + // The pipeline defined with the user request, prior to optimization and addition of any view + // definitions. + std::vector<BSONObj> _userPipeline; boost::intrusive_ptr<DocumentSourceMatch> _matchSrc; boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc; - bool _handlingUnwind = false; - bool _handlingMatch = false; - - // The following members are used to hold onto state across getNext() calls when - // '_handlingUnwind' is true. + // The following members are used to hold onto state across getNext() calls when '_unwindSrc' is + // not null. long long _cursorIndex = 0; std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline; boost::optional<Document> _input; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 4e8a59bd75a..e43d8efb5b6 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -99,6 +99,106 @@ TEST_F(DocumentSourceLookUpTest, ShouldTruncateOutputSortOnSuffixOfAsField) { ASSERT_EQUALS(outputSort.size(), 1U); } +TEST_F(DocumentSourceLookUpTest, AcceptsPipelineSyntax) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" + << "coll" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) + << "as" + << "as")) + .firstElement(), + expCtx); + auto lookup = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax()); +} + +TEST_F(DocumentSourceLookUpTest, RejectsLocalFieldForeignFieldWhenPipelineIsSpecified) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + try { + auto lookupStage = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" + << "coll" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) + << "localField" + << "a" + << "foreignField" + << "b" + << "as" + << "as")) + .firstElement(), + expCtx); + + FAIL(str::stream() + << "Expected creation of the " + << lookupStage->getSourceName() + << " stage to uassert on mix of localField/foreignField and pipeline options"); + } catch (const UserException& ex) { + ASSERT_EQ(40450, ex.getCode()); + } +} + +TEST_F(DocumentSourceLookUpTest, ShouldBeAbleToReParseSerializedStage) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto lookupStage = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" + << "coll" + << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))) + << "as" + << "as")) + .firstElement(), + expCtx); + + // + // Serialize the $lookup stage and confirm contents. + // + vector<Value> serialization; + lookupStage->serializeToArray(serialization); + ASSERT_EQ(serialization.size(), 1UL); + ASSERT_EQ(serialization[0].getType(), BSONType::Object); + + // The fields are in no guaranteed order, so we can't perform a simple Document comparison. + auto serializedDoc = serialization[0].getDocument(); + ASSERT_EQ(serializedDoc["$lookup"].getType(), BSONType::Object); + + auto serializedStage = serializedDoc["$lookup"].getDocument(); + ASSERT_EQ(serializedStage.size(), 3UL); + ASSERT_VALUE_EQ(serializedStage["from"], Value(std::string("coll"))); + ASSERT_VALUE_EQ(serializedStage["as"], Value(std::string("as"))); + + ASSERT_EQ(serializedStage["pipeline"].getType(), BSONType::Array); + ASSERT_EQ(serializedStage["pipeline"].getArrayLength(), 1UL); + + ASSERT_EQ(serializedStage["pipeline"][0].getType(), BSONType::Object); + ASSERT_DOCUMENT_EQ(serializedStage["pipeline"][0]["$match"].getDocument(), + Document(fromjson("{x: 1}"))); + + // + // Create a new $lookup stage from the serialization. Serialize the new stage and confirm that + // it is equivalent to the original serialization. + // + auto serializedBson = serializedDoc.toBson(); + auto roundTripped = DocumentSourceLookUp::createFromBson(serializedBson.firstElement(), expCtx); + + vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) { auto input = Document{{"local", 1}}; BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput( diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index e38263b9785..a0f50825b49 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -227,6 +227,21 @@ TEST(PipelineOptimizationTest, LookupShouldCoalesceWithUnwindOnAs) { assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe); } +TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldCoalesceWithUnwindOnAs) { + string inputPipe = + "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}" + ",{$unwind: {path: '$same'}}" + "]"; + string outputPipe = + "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: [], " + "unwinding: {preserveNullAndEmptyArrays: false}}}]"; + string serializedPipe = + "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}" + ",{$unwind: {path: '$same'}}" + "]"; + assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe); +} + TEST(PipelineOptimizationTest, LookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty) { string inputPipe = "[{$lookup: {from : 'lookupColl', as : 'same', localField: 'left', foreignField: " @@ -276,6 +291,18 @@ TEST(PipelineOptimizationTest, LookupShouldNotCoalesceWithUnwindNotOnAs) { assertPipelineOptimizesTo(inputPipe, outputPipe); } +TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldNotCoalesceWithUnwindNotOnAs) { + string inputPipe = + "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}" + ",{$unwind: {path: '$from'}}" + "]"; + string outputPipe = + "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}" + ",{$unwind: {path: '$from'}}" + "]"; + assertPipelineOptimizesTo(inputPipe, outputPipe); +} + TEST(PipelineOptimizationTest, LookupShouldSwapWithMatch) { string inputPipe = "[{$lookup: {from: 'lookupColl', as: 'asField', localField: 'y', foreignField: " @@ -288,6 +315,16 @@ TEST(PipelineOptimizationTest, LookupShouldSwapWithMatch) { assertPipelineOptimizesTo(inputPipe, outputPipe); } +TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldSwapWithMatch) { + string inputPipe = + "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: []}}, " + " {$match: {'independent': 0}}]"; + string outputPipe = + "[{$match: {independent: 0}}, " + " {$lookup: {from: 'lookupColl', as: 'asField', pipeline: []}}]"; + assertPipelineOptimizesTo(inputPipe, outputPipe); +} + TEST(PipelineOptimizationTest, LookupShouldSplitMatch) { string inputPipe = "[{$lookup: {from: 'lookupColl', as: 'asField', localField: 'y', foreignField: " @@ -331,6 +368,22 @@ TEST(PipelineOptimizationTest, LookupShouldAbsorbUnwindMatch) { assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe); } +TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldAbsorbUnwindMatch) { + string inputPipe = + "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: []}}, " + "{$unwind: '$asField'}, " + "{$match: {'asField.subfield': {$eq: 1}}}]"; + string outputPipe = + "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: [{$match: {subfield: {$eq: " + "1}}}], " + "unwinding: {preserveNullAndEmptyArrays: false} } } ]"; + string serializedPipe = + "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: [{$match: {subfield: {$eq: " + "1}}}]}}, " + "{$unwind: {path: '$asField'}}]"; + assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe); +} + TEST(PipelineOptimizationTest, LookupShouldAbsorbUnwindAndSplitAndAbsorbMatch) { string inputPipe = "[{$lookup: {from: 'lookupColl', as: 'asField', localField: 'y', foreignField: " |