summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Wahlin <james.wahlin@10gen.com>2017-05-04 16:42:50 -0400
committerJames Wahlin <james.wahlin@10gen.com>2017-05-19 16:44:50 -0400
commit7cc042a4f8d21354b36d44f6b3642d2795ecb9ee (patch)
tree62dfc213df5a16ad006fe660fea8aecc7dea8e32
parent2aaa0eafa5f1c6e1c43c1f42fcf7975722c3fbfe (diff)
downloadmongo-7cc042a4f8d21354b36d44f6b3642d2795ecb9ee.tar.gz
SERVER-29072 Add support for $lookup into a sub-pipeline
-rw-r--r--jstests/aggregation/bugs/server19095.js2
-rw-r--r--jstests/aggregation/sources/lookup/lookup_non_correlated.js64
-rw-r--r--jstests/core/views/views_validation.js4
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp27
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp208
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h60
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp100
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp53
9 files changed, 447 insertions, 77 deletions
diff --git a/jstests/aggregation/bugs/server19095.js b/jstests/aggregation/bugs/server19095.js
index e55539cedc7..5577faf7b4a 100644
--- a/jstests/aggregation/bugs/server19095.js
+++ b/jstests/aggregation/bugs/server19095.js
@@ -377,7 +377,7 @@ load("jstests/aggregation/extras/utils.js");
assertErrorCode(coll, [{$lookup: {localField: "a", from: "from", as: "same"}}], 4572);
assertErrorCode(coll, [{$lookup: {localField: "a", foreignField: "b", as: "same"}}], 40320);
assertErrorCode(
- coll, [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], 4572);
+ coll, [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], 40449);
// All four field's values must be strings.
assertErrorCode(
diff --git a/jstests/aggregation/sources/lookup/lookup_non_correlated.js b/jstests/aggregation/sources/lookup/lookup_non_correlated.js
new file mode 100644
index 00000000000..523eb37f8d4
--- /dev/null
+++ b/jstests/aggregation/sources/lookup/lookup_non_correlated.js
@@ -0,0 +1,64 @@
+// Cannot implicitly shard accessed collections as $lookup does not support sharded target
+// collection.
+// @tags: [assumes_unsharded_collection]
+
+/**
+ * Confirms that $lookup with a non-correlated foreign pipeline returns expected results.
+ */
+(function() {
+ "use strict";
+
+ const testDB = db.getSiblingDB("lookup_non_correlated");
+ const localName = "local";
+ const localColl = testDB.getCollection(localName);
+ localColl.drop();
+ const foreignName = "foreign";
+ const foreignColl = testDB.getCollection(foreignName);
+ foreignColl.drop();
+
+ assert.writeOK(localColl.insert({_id: "A"}));
+ assert.writeOK(localColl.insert({_id: "B"}));
+ assert.writeOK(localColl.insert({_id: "C"}));
+
+ assert.writeOK(foreignColl.insert({_id: 1}));
+ assert.writeOK(foreignColl.insert({_id: 2}));
+ assert.writeOK(foreignColl.insert({_id: 3}));
+
+ // Basic non-correlated lookup returns expected results.
+ let cursor = localColl.aggregate([
+ {$match: {_id: {$in: ["B", "C"]}}},
+ {$sort: {_id: 1}},
+ {$lookup: {from: foreignName, as: "foreignDocs", pipeline: [{$match: {_id: {"$gte": 2}}}]}},
+ ]);
+
+ assert(cursor.hasNext());
+ assert.docEq({_id: "B", foreignDocs: [{_id: 2}, {_id: 3}]}, cursor.next());
+ assert(cursor.hasNext());
+ assert.docEq({_id: "C", foreignDocs: [{_id: 2}, {_id: 3}]}, cursor.next());
+ assert(!cursor.hasNext());
+
+ // Non-correlated lookup followed by unwind on 'as' returns expected results.
+ cursor = localColl.aggregate([
+ {$match: {_id: "A"}},
+ {$lookup: {from: foreignName, as: "foreignDocs", pipeline: [{$match: {_id: {"$gte": 2}}}]}},
+ {$unwind: "$foreignDocs"}
+ ]);
+
+ assert(cursor.hasNext());
+ assert.docEq({_id: "A", foreignDocs: {_id: 2}}, cursor.next());
+ assert(cursor.hasNext());
+ assert.docEq({_id: "A", foreignDocs: {_id: 3}}, cursor.next());
+ assert(!cursor.hasNext());
+
+ // Non-correlated lookup followed by unwind and filter on 'as' returns expected results.
+ cursor = localColl.aggregate([
+ {$match: {_id: "A"}},
+ {$lookup: {from: foreignName, as: "foreignDocs", pipeline: [{$match: {_id: {"$gte": 2}}}]}},
+ {$unwind: "$foreignDocs"},
+ {$match: {"foreignDocs._id": 2}}
+ ]);
+
+ assert(cursor.hasNext());
+ assert.docEq({_id: "A", foreignDocs: {_id: 2}}, cursor.next());
+ assert(!cursor.hasNext());
+})();
diff --git a/jstests/core/views/views_validation.js b/jstests/core/views/views_validation.js
index 84c7f1d3510..d713333c901 100644
--- a/jstests/core/views/views_validation.js
+++ b/jstests/core/views/views_validation.js
@@ -123,6 +123,6 @@
"collMod modified view to have invalid pipeline");
clear();
- // Check that invalid pipelines are disallowed.
- makeView("a", "b", [{"$lookup": {from: "a"}}], 4572); // 4572 is for missing $lookup fields
+ // Check that invalid pipelines are disallowed. The following $lookup is missing the 'as' field.
+ makeView("a", "b", [{"$lookup": {from: "a", localField: "b", foreignField: "c"}}], 40449);
}());
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index f915ab47431..d2cdfddd0c5 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -63,16 +63,13 @@ constexpr long long AggregationRequest::kDefaultBatchSize;
AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
: _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {}
-StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
- NamespaceString nss,
- const BSONObj& cmdObj,
- boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
- // Parse required parameters.
- auto pipelineElem = cmdObj[kPipelineName];
+StatusWith<std::vector<BSONObj>> AggregationRequest::parsePipelineFromBSON(
+ BSONElement pipelineElem) {
+ std::vector<BSONObj> pipeline;
if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) {
return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"};
}
- std::vector<BSONObj> pipeline;
+
for (auto elem : pipelineElem.Obj()) {
if (elem.type() != BSONType::Object) {
return {ErrorCodes::TypeMismatch,
@@ -81,7 +78,21 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
pipeline.push_back(elem.embeddedObject().getOwned());
}
- AggregationRequest request(std::move(nss), std::move(pipeline));
+ return std::move(pipeline);
+}
+
+StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
+ NamespaceString nss,
+ const BSONObj& cmdObj,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity) {
+ // Parse required parameters.
+ auto pipelineElem = cmdObj[kPipelineName];
+ auto pipeline = AggregationRequest::parsePipelineFromBSON(pipelineElem);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ AggregationRequest request(std::move(nss), std::move(pipeline.getValue()));
const std::initializer_list<StringData> optionsParsedElseWhere = {kPipelineName, kCommandName};
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 14b4dad0e59..4feaea931a8 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -61,6 +61,12 @@ public:
static constexpr long long kDefaultBatchSize = 101;
/**
+ * Parse an aggregation pipeline definition from 'pipelineElem'. Returns a non-OK status if
+ * pipeline is not an array or if any of the array elements are not objects.
+ */
+ static StatusWith<std::vector<BSONObj>> parsePipelineFromBSON(BSONElement pipelineElem);
+
+ /**
* Create a new instance of AggregationRequest by parsing the raw command object. Returns a
* non-OK status if a required field was missing, if there was an unrecognized field name or if
* there was a bad value for one of the fields.
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 4c68db3f1da..70e5dffaf2e 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -44,27 +44,63 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
+namespace {
+std::string pipelineToString(const vector<BSONObj>& pipeline) {
+ StringBuilder sb;
+ sb << "[";
+
+ auto first = true;
+ for (auto& stageSpec : pipeline) {
+ if (!first) {
+ sb << ", ";
+ } else {
+ first = false;
+ }
+ sb << stageSpec;
+ }
+ sb << "]";
+ return sb.str();
+}
+} // namespace
+
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
- std::string localField,
- std::string foreignField,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongod(pExpCtx),
- _fromNs(std::move(fromNs)),
- _as(std::move(as)),
- _localField(std::move(localField)),
- _foreignField(foreignField),
- _foreignFieldFieldName(std::move(foreignField)) {
+ : DocumentSourceNeedsMongod(pExpCtx), _fromNs(std::move(fromNs)), _as(std::move(as)) {
const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs);
_fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns);
_fromPipeline = resolvedNamespace.pipeline;
+ _resolvedNs = std::move(resolvedNamespace.ns);
+}
+DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
+ std::string as,
+ std::string localField,
+ std::string foreignField,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSourceLookUp(fromNs, as, pExpCtx) {
+ _localField = std::move(localField);
+ _foreignField = std::move(foreignField);
// We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage
// we'll eventually construct from the input document.
_fromPipeline.reserve(_fromPipeline.size() + 1);
_fromPipeline.push_back(BSONObj());
}
+DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
+ std::string as,
+ std::vector<BSONObj> pipeline,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSourceLookUp(fromNs, as, pExpCtx) {
+ // '_fromPipeline' will first be initialized by the constructor delegated to within this
+ // constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs'
+ // represents a view. We append the user 'pipeline' to the end of '_fromPipeline' to ensure any
+ // view prefix is not overwritten.
+ _fromPipeline.insert(_fromPipeline.end(), pipeline.begin(), pipeline.end());
+
+ _userPipeline = std::move(pipeline);
+}
+
std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> DocumentSourceLookUp::liteParse(
const AggregationRequest& request, const BSONElement& spec) {
uassert(40319,
@@ -121,15 +157,7 @@ BSONObj buildEqualityOrQuery(const std::string& fieldName, const vector<Value>&
DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
pExpCtx->checkForInterrupt();
- if (!_additionalFilter && _matchSrc) {
- // We have internalized a $match, but have not yet computed the descended $match that should
- // be applied to our queries.
- _additionalFilter = DocumentSourceMatch::descendMatchOnPath(
- _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx)
- ->getQuery();
- }
-
- if (_handlingUnwind) {
+ if (_unwindSrc) {
return unwindResult();
}
@@ -140,22 +168,27 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
auto inputDoc = nextInput.releaseDocument();
// If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
- // '_handlingUnwind' would be set to true, and we would not have made it here.
+ // '_unwindSrc' would be non-null, and we would not have made it here.
invariant(!_matchSrc);
- auto matchStage =
- makeMatchStageFromInput(inputDoc, _localField, _foreignFieldFieldName, BSONObj());
- // We've already allocated space for the trailing $match stage in '_fromPipeline'.
- _fromPipeline.back() = matchStage;
+ if (!wasConstructedWithPipelineSyntax()) {
+ auto matchStage =
+ makeMatchStageFromInput(inputDoc, *_localField, _foreignField->fullPath(), BSONObj());
+ // We've already allocated space for the trailing $match stage in '_fromPipeline'.
+ _fromPipeline.back() = matchStage;
+ }
+
auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
std::vector<Value> results;
int objsize = 0;
+
while (auto result = pipeline->getNext()) {
objsize += result->getApproximateSize();
uassert(4568,
- str::stream() << "Total size of documents in " << _fromNs.coll() << " matching "
- << matchStage
+ str::stream() << "Total size of documents in " << _fromNs.coll()
+ << " matching pipeline "
+ << getUserPipelineDefinition()
<< " exceeds maximum document size",
objsize <= BSONObjMaxInternalSize);
results.emplace_back(std::move(*result));
@@ -185,9 +218,8 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
// If we are not already handling an $unwind stage internally, we can combine with the
// following $unwind stage.
- if (nextUnwind && !_handlingUnwind && nextUnwind->getUnwindPath() == _as.fullPath()) {
+ if (nextUnwind && !_unwindSrc && nextUnwind->getUnwindPath() == _as.fullPath()) {
_unwindSrc = std::move(nextUnwind);
- _handlingUnwind = true;
container->erase(std::next(itr));
return itr;
}
@@ -199,7 +231,7 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
return std::next(itr);
}
- if (!_handlingUnwind || _unwindSrc->indexPath() || _unwindSrc->preserveNullAndEmptyArrays()) {
+ if (!_unwindSrc || _unwindSrc->indexPath() || _unwindSrc->preserveNullAndEmptyArrays()) {
// We must be unwinding our result to internalize a $match. For example, consider the
// following pipeline:
//
@@ -266,9 +298,8 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
}
// We can internalize the $match.
- if (!_handlingMatch) {
+ if (!_matchSrc) {
_matchSrc = nextMatch;
- _handlingMatch = true;
} else {
// We have already absorbed a $match. We need to join it with 'dependent'.
_matchSrc->joinMatchWith(nextMatch);
@@ -277,9 +308,30 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
// Remove the original $match. There may be further optimization between this $lookup and the
// new neighbor, so we return an iterator pointing to ourself.
container->erase(std::next(itr));
+
+ // We have internalized a $match, but have not yet computed the descended $match that should
+ // be applied to our queries.
+ _additionalFilter = DocumentSourceMatch::descendMatchOnPath(
+ _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx)
+ ->getQuery()
+ .getOwned();
+
+ if (wasConstructedWithPipelineSyntax()) {
+ auto matchObj = BSON("$match" << *_additionalFilter);
+ _fromPipeline.push_back(matchObj);
+ }
+
return itr;
}
+std::string DocumentSourceLookUp::getUserPipelineDefinition() {
+ if (wasConstructedWithPipelineSyntax()) {
+ return pipelineToString(_userPipeline);
+ }
+
+ return _fromPipeline.back().toString();
+}
+
void DocumentSourceLookUp::doDispose() {
if (_pipeline) {
_pipeline->dispose(pExpCtx->opCtx);
@@ -376,11 +428,13 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
_input = nextInput.releaseDocument();
- BSONObj filter = _additionalFilter.value_or(BSONObj());
- auto matchStage =
- makeMatchStageFromInput(*_input, _localField, _foreignFieldFieldName, filter);
- // We've already allocated space for the trailing $match stage in '_fromPipeline'.
- _fromPipeline.back() = matchStage;
+ if (!wasConstructedWithPipelineSyntax()) {
+ BSONObj filter = _additionalFilter.value_or(BSONObj());
+ auto matchStage =
+ makeMatchStageFromInput(*_input, *_localField, _foreignField->fullPath(), filter);
+ // We've already allocated space for the trailing $match stage in '_fromPipeline'.
+ _fromPipeline.back() = matchStage;
+ }
if (_pipeline) {
_pipeline->dispose(pExpCtx->opCtx);
@@ -427,13 +481,23 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
- MutableDocument output(DOC(
- getSourceName() << DOC("from" << _fromNs.coll() << "as" << _as.fullPath() << "localField"
- << _localField.fullPath()
- << "foreignField"
- << _foreignField.fullPath())));
+ Document doc;
+ if (wasConstructedWithPipelineSyntax()) {
+ doc = Document{{getSourceName(),
+ Document{{"from", _resolvedNs.coll()},
+ {"as", _as.fullPath()},
+ {"pipeline", _fromPipeline}}}};
+ } else {
+ doc = Document{{getSourceName(),
+ {Document{{"from", _fromNs.coll()},
+ {"as", _as.fullPath()},
+ {"localField", _localField->fullPath()},
+ {"foreignField", _foreignField->fullPath()}}}}};
+ }
+
+ MutableDocument output(doc);
if (explain) {
- if (_handlingUnwind) {
+ if (_unwindSrc) {
const boost::optional<FieldPath> indexPath = _unwindSrc->indexPath();
output[getSourceName()]["unwinding"] =
Value(DOC("preserveNullAndEmptyArrays"
@@ -442,24 +506,24 @@ void DocumentSourceLookUp::serializeToArray(
<< (indexPath ? Value(indexPath->fullPath()) : Value())));
}
- if (_matchSrc) {
+ // Only add _matchSrc for explain when $lookup was constructed with localField/foreignField
+ // syntax. For pipeline sytax, _matchSrc will be included as part of the pipeline
+ // definition.
+ if (!wasConstructedWithPipelineSyntax() && _additionalFilter) {
// Our output does not have to be parseable, so include a "matching" field with the
// descended match expression.
- output[getSourceName()]["matching"] =
- Value(DocumentSourceMatch::descendMatchOnPath(
- _matchSrc->getMatchExpression(), _as.fullPath(), pExpCtx)
- ->getQuery());
+ output[getSourceName()]["matching"] = Value(*_additionalFilter);
}
array.push_back(Value(output.freeze()));
} else {
array.push_back(Value(output.freeze()));
- if (_handlingUnwind) {
+ if (_unwindSrc) {
_unwindSrc->serializeToArray(array);
}
- if (_matchSrc) {
+ if (!wasConstructedWithPipelineSyntax() && _matchSrc) {
// '_matchSrc' tracks the originally specified $match. We descend upon the $match in the
// first call to getNext(), at which point we are confident that we no longer need to
// serialize the $lookup again.
@@ -469,7 +533,10 @@ void DocumentSourceLookUp::serializeToArray(
}
DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
- deps->fields.insert(_localField.fullPath());
+ // As current pipeline syntax only supports non-correlated join, it precludes dependencies.
+ if (!wasConstructedWithPipelineSyntax()) {
+ deps->fields.insert(_localField->fullPath());
+ }
return SEE_NEXT;
}
@@ -503,13 +570,28 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
std::string as;
std::string localField;
std::string foreignField;
+ std::vector<BSONObj> pipeline;
+ bool hasPipeline = false;
for (auto&& argument : elem.Obj()) {
+ const auto argName = argument.fieldNameStringData();
+
+ if (argName == "pipeline") {
+ auto result = AggregationRequest::parsePipelineFromBSON(argument);
+ if (!result.isOK()) {
+ uasserted(40447,
+ str::stream() << "invalid $lookup pipeline definition: "
+ << result.getStatus().toString());
+ }
+ pipeline = std::move(result.getValue());
+ hasPipeline = true;
+ continue;
+ }
+
uassert(4570,
- str::stream() << "arguments to $lookup must be strings, " << argument << " is type "
+ str::stream() << "$lookup '" << argument << "' must be a string, is type "
<< argument.type(),
argument.type() == String);
- const auto argName = argument.fieldNameStringData();
if (argName == "from") {
fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String());
@@ -525,11 +607,27 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
}
}
- uassert(4572,
- "need to specify fields from, as, localField, and foreignField for a $lookup",
- !fromNs.ns().empty() && !as.empty() && !localField.empty() && !foreignField.empty());
+ uassert(40451, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
+ uassert(40449, "must specify 'as' field for a $lookup", !as.empty());
- return new DocumentSourceLookUp(
- std::move(fromNs), std::move(as), std::move(localField), std::move(foreignField), pExpCtx);
+ if (hasPipeline) {
+ uassert(40450,
+ "$lookup with 'pipeline' may not specify 'localField' or 'foreignField'",
+ localField.empty() || foreignField.empty());
+
+ return new DocumentSourceLookUp(
+ std::move(fromNs), std::move(as), std::move(pipeline), pExpCtx);
+ } else {
+ uassert(4572,
+ "$lookup requires either 'pipeline' or both 'localField' and 'foreignField' to be "
+ "specified",
+ !localField.empty() && !foreignField.empty());
+
+ return new DocumentSourceLookUp(std::move(fromNs),
+ std::move(as),
+ std::move(localField),
+ std::move(foreignField),
+ pExpCtx);
+ }
}
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index e696e8bb4e5..50b02501189 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -103,9 +103,16 @@ public:
* Helper to absorb an $unwind stage. Only used for testing this special behavior.
*/
void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) {
- invariant(!_handlingUnwind);
+ invariant(!_unwindSrc);
_unwindSrc = unwind;
- _handlingUnwind = true;
+ }
+
+ /**
+ * Returns true if DocumentSourceLookup was constructed with pipeline syntax (as opposed to
+ * localField/foreignField syntax).
+ */
+ bool wasConstructedWithPipelineSyntax() const {
+ return !static_cast<bool>(_localField);
}
protected:
@@ -119,41 +126,72 @@ protected:
Pipeline::SourceContainer* container) final;
private:
+ /**
+ * Target constructor. Handles common-field initialization for the syntax-specific delegating
+ * constructors.
+ */
+ DocumentSourceLookUp(NamespaceString fromNs,
+ std::string as,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * Constructor used for a $lookup stage specified using the {from: ..., localField: ...,
+ * foreignField: ..., as: ...} syntax.
+ */
DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::string localField,
std::string foreignField,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ /**
+ * Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as:
+ * ...} syntax.
+ */
+ DocumentSourceLookUp(NamespaceString fromNs,
+ std::string as,
+ std::vector<BSONObj> pipeline,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * Should not be called; use serializeToArray instead.
+ */
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
- // Should not be called; use serializeToArray instead.
MONGO_UNREACHABLE;
}
GetNextResult unwindResult();
+ /**
+ * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that
+ * is executed in that it will not include optimizations or resolved views.
+ */
+ std::string getUserPipelineDefinition();
+
NamespaceString _fromNs;
+ NamespaceString _resolvedNs;
FieldPath _as;
- FieldPath _localField;
- FieldPath _foreignField;
- std::string _foreignFieldFieldName;
boost::optional<BSONObj> _additionalFilter;
+ // For use when $lookup is specified with localField/foreignField syntax.
+ boost::optional<FieldPath> _localField;
+ boost::optional<FieldPath> _foreignField;
+
// The ExpressionContext used when performing aggregation pipelines against the '_fromNs'
// namespace.
boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
// The aggregation pipeline to perform against the '_fromNs' namespace.
std::vector<BSONObj> _fromPipeline;
+ // The pipeline defined with the user request, prior to optimization and addition of any view
+ // definitions.
+ std::vector<BSONObj> _userPipeline;
boost::intrusive_ptr<DocumentSourceMatch> _matchSrc;
boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc;
- bool _handlingUnwind = false;
- bool _handlingMatch = false;
-
- // The following members are used to hold onto state across getNext() calls when
- // '_handlingUnwind' is true.
+ // The following members are used to hold onto state across getNext() calls when '_unwindSrc' is
+ // not null.
long long _cursorIndex = 0;
std::unique_ptr<Pipeline, Pipeline::Deleter> _pipeline;
boost::optional<Document> _input;
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 4e8a59bd75a..e43d8efb5b6 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -99,6 +99,106 @@ TEST_F(DocumentSourceLookUpTest, ShouldTruncateOutputSortOnSuffixOfAsField) {
ASSERT_EQUALS(outputSort.size(), 1U);
}
+TEST_F(DocumentSourceLookUpTest, AcceptsPipelineSyntax) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from"
+ << "coll"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))
+ << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+ auto lookup = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax());
+}
+
+TEST_F(DocumentSourceLookUpTest, RejectsLocalFieldForeignFieldWhenPipelineIsSpecified) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ try {
+ auto lookupStage = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from"
+ << "coll"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))
+ << "localField"
+ << "a"
+ << "foreignField"
+ << "b"
+ << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+
+ FAIL(str::stream()
+ << "Expected creation of the "
+ << lookupStage->getSourceName()
+ << " stage to uassert on mix of localField/foreignField and pipeline options");
+ } catch (const UserException& ex) {
+ ASSERT_EQ(40450, ex.getCode());
+ }
+}
+
+TEST_F(DocumentSourceLookUpTest, ShouldBeAbleToReParseSerializedStage) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto lookupStage = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from"
+ << "coll"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))
+ << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+
+ //
+ // Serialize the $lookup stage and confirm contents.
+ //
+ vector<Value> serialization;
+ lookupStage->serializeToArray(serialization);
+ ASSERT_EQ(serialization.size(), 1UL);
+ ASSERT_EQ(serialization[0].getType(), BSONType::Object);
+
+ // The fields are in no guaranteed order, so we can't perform a simple Document comparison.
+ auto serializedDoc = serialization[0].getDocument();
+ ASSERT_EQ(serializedDoc["$lookup"].getType(), BSONType::Object);
+
+ auto serializedStage = serializedDoc["$lookup"].getDocument();
+ ASSERT_EQ(serializedStage.size(), 3UL);
+ ASSERT_VALUE_EQ(serializedStage["from"], Value(std::string("coll")));
+ ASSERT_VALUE_EQ(serializedStage["as"], Value(std::string("as")));
+
+ ASSERT_EQ(serializedStage["pipeline"].getType(), BSONType::Array);
+ ASSERT_EQ(serializedStage["pipeline"].getArrayLength(), 1UL);
+
+ ASSERT_EQ(serializedStage["pipeline"][0].getType(), BSONType::Object);
+ ASSERT_DOCUMENT_EQ(serializedStage["pipeline"][0]["$match"].getDocument(),
+ Document(fromjson("{x: 1}")));
+
+ //
+ // Create a new $lookup stage from the serialization. Serialize the new stage and confirm that
+ // it is equivalent to the original serialization.
+ //
+ auto serializedBson = serializedDoc.toBson();
+ auto roundTripped = DocumentSourceLookUp::createFromBson(serializedBson.firstElement(), expCtx);
+
+ vector<Value> newSerialization;
+ roundTripped->serializeToArray(newSerialization);
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
+}
+
TEST(MakeMatchStageFromInput, NonArrayValueUsesEqQuery) {
auto input = Document{{"local", 1}};
BSONObj matchStage = DocumentSourceLookUp::makeMatchStageFromInput(
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index e38263b9785..a0f50825b49 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -227,6 +227,21 @@ TEST(PipelineOptimizationTest, LookupShouldCoalesceWithUnwindOnAs) {
assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe);
}
+TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldCoalesceWithUnwindOnAs) {
+ string inputPipe =
+ "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}"
+ ",{$unwind: {path: '$same'}}"
+ "]";
+ string outputPipe =
+ "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: [], "
+ "unwinding: {preserveNullAndEmptyArrays: false}}}]";
+ string serializedPipe =
+ "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}"
+ ",{$unwind: {path: '$same'}}"
+ "]";
+ assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe);
+}
+
TEST(PipelineOptimizationTest, LookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty) {
string inputPipe =
"[{$lookup: {from : 'lookupColl', as : 'same', localField: 'left', foreignField: "
@@ -276,6 +291,18 @@ TEST(PipelineOptimizationTest, LookupShouldNotCoalesceWithUnwindNotOnAs) {
assertPipelineOptimizesTo(inputPipe, outputPipe);
}
+TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldNotCoalesceWithUnwindNotOnAs) {
+ string inputPipe =
+ "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}"
+ ",{$unwind: {path: '$from'}}"
+ "]";
+ string outputPipe =
+ "[{$lookup: {from : 'lookupColl', as : 'same', pipeline: []}}"
+ ",{$unwind: {path: '$from'}}"
+ "]";
+ assertPipelineOptimizesTo(inputPipe, outputPipe);
+}
+
TEST(PipelineOptimizationTest, LookupShouldSwapWithMatch) {
string inputPipe =
"[{$lookup: {from: 'lookupColl', as: 'asField', localField: 'y', foreignField: "
@@ -288,6 +315,16 @@ TEST(PipelineOptimizationTest, LookupShouldSwapWithMatch) {
assertPipelineOptimizesTo(inputPipe, outputPipe);
}
+TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldSwapWithMatch) {
+ string inputPipe =
+ "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: []}}, "
+ " {$match: {'independent': 0}}]";
+ string outputPipe =
+ "[{$match: {independent: 0}}, "
+ " {$lookup: {from: 'lookupColl', as: 'asField', pipeline: []}}]";
+ assertPipelineOptimizesTo(inputPipe, outputPipe);
+}
+
TEST(PipelineOptimizationTest, LookupShouldSplitMatch) {
string inputPipe =
"[{$lookup: {from: 'lookupColl', as: 'asField', localField: 'y', foreignField: "
@@ -331,6 +368,22 @@ TEST(PipelineOptimizationTest, LookupShouldAbsorbUnwindMatch) {
assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe);
}
+TEST(PipelineOptimizationTest, LookupWithPipelineSyntaxShouldAbsorbUnwindMatch) {
+ string inputPipe =
+ "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: []}}, "
+ "{$unwind: '$asField'}, "
+ "{$match: {'asField.subfield': {$eq: 1}}}]";
+ string outputPipe =
+ "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: [{$match: {subfield: {$eq: "
+ "1}}}], "
+ "unwinding: {preserveNullAndEmptyArrays: false} } } ]";
+ string serializedPipe =
+ "[{$lookup: {from: 'lookupColl', as: 'asField', pipeline: [{$match: {subfield: {$eq: "
+ "1}}}]}}, "
+ "{$unwind: {path: '$asField'}}]";
+ assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe);
+}
+
TEST(PipelineOptimizationTest, LookupShouldAbsorbUnwindAndSplitAndAbsorbMatch) {
string inputPipe =
"[{$lookup: {from: 'lookupColl', as: 'asField', localField: 'y', foreignField: "