summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_lookup.cpp
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2017-05-25 16:34:41 -0400
committerJames Wahlin <james@mongodb.com>2017-06-05 15:19:44 -0400
commitcc6f3af6e1361c62f04a10596e86e651e1226525 (patch)
tree412eb486170f5dd3e009aa136994cabfde4736cc /src/mongo/db/pipeline/document_source_lookup.cpp
parent2daa02b7294412f2d5f2b7f224ef94f290f12f12 (diff)
downloadmongo-cc6f3af6e1361c62f04a10596e86e651e1226525.tar.gz
SERVER-29073 Allow variable definition within $lookup
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp156
1 files changed, 114 insertions, 42 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 57f0d6ecb66..0f09f98a7ea 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -67,11 +67,14 @@ std::string pipelineToString(const vector<BSONObj>& pipeline) {
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongod(pExpCtx), _fromNs(std::move(fromNs)), _as(std::move(as)) {
+ : DocumentSourceNeedsMongod(pExpCtx),
+ _fromNs(std::move(fromNs)),
+ _as(std::move(as)),
+ _variablesParseState(_variables.useIdGenerator()) {
const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs);
- _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns);
- _fromPipeline = resolvedNamespace.pipeline;
- _resolvedNs = std::move(resolvedNamespace.ns);
+ _resolvedNs = resolvedNamespace.ns;
+ _resolvedPipeline = resolvedNamespace.pipeline;
+ _fromExpCtx = pExpCtx->copyWith(_resolvedNs);
}
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
@@ -82,45 +85,58 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
: DocumentSourceLookUp(fromNs, as, pExpCtx) {
_localField = std::move(localField);
_foreignField = std::move(foreignField);
- // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage
+ // We append an additional BSONObj to '_resolvedPipeline' as a placeholder for the $match stage
// we'll eventually construct from the input document.
- _fromPipeline.reserve(_fromPipeline.size() + 1);
- _fromPipeline.push_back(BSONObj());
+ _resolvedPipeline.reserve(_resolvedPipeline.size() + 1);
+ _resolvedPipeline.push_back(BSONObj());
}
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::vector<BSONObj> pipeline,
+ BSONObj letVariables,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSourceLookUp(fromNs, as, pExpCtx) {
- // '_fromPipeline' will first be initialized by the constructor delegated to within this
+ // '_resolvedPipeline' will first be initialized by the constructor delegated to within this
// constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs'
- // represents a view. We append the user 'pipeline' to the end of '_fromPipeline' to ensure any
- // view prefix is not overwritten.
- _fromPipeline.insert(_fromPipeline.end(), pipeline.begin(), pipeline.end());
+ // represents a view. We append the user 'pipeline' to the end of '_resolvedPipeline' to ensure
+ // any view prefix is not overwritten.
+ _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
_userPipeline = std::move(pipeline);
+
+ for (auto&& varElem : letVariables) {
+ const auto varName = varElem.fieldNameStringData();
+ Variables::uassertValidNameForUserWrite(varName);
+
+ _letVariables.emplace_back(
+ varName.toString(),
+ Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState),
+ _variablesParseState.defineVariable(varName));
+ }
}
std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> DocumentSourceLookUp::liteParse(
const AggregationRequest& request, const BSONElement& spec) {
- uassert(40319,
+ uassert(ErrorCodes::FailedToParse,
str::stream() << "the $lookup stage specification must be an object, but found "
<< typeName(spec.type()),
spec.type() == BSONType::Object);
auto specObj = spec.Obj();
auto fromElement = specObj["from"];
- uassert(40320,
+ uassert(ErrorCodes::FailedToParse,
str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
fromElement);
- uassert(40321,
+ uassert(ErrorCodes::FailedToParse,
str::stream() << "'from' option to $lookup must be a string, but was type "
<< typeName(specObj["from"].type()),
fromElement.type() == BSONType::String);
NamespaceString nss(request.getNamespaceString().db(), fromElement.valueStringData());
- uassert(40322, str::stream() << "invalid $lookup namespace: " << nss.ns(), nss.isValid());
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "invalid $lookup namespace: " << nss.ns(),
+ nss.isValid());
return stdx::make_unique<LiteParsedDocumentSourceOneForeignCollection>(std::move(nss));
}
@@ -166,7 +182,10 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
if (!nextInput.isAdvanced()) {
return nextInput;
}
+
auto inputDoc = nextInput.releaseDocument();
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+ resolveLetVariables(inputDoc, &_fromExpCtx->variables);
// If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
// '_unwindSrc' would be non-null, and we would not have made it here.
@@ -175,11 +194,11 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
if (!wasConstructedWithPipelineSyntax()) {
auto matchStage =
makeMatchStageFromInput(inputDoc, *_localField, _foreignField->fullPath(), BSONObj());
- // We've already allocated space for the trailing $match stage in '_fromPipeline'.
- _fromPipeline.back() = matchStage;
+ // We've already allocated space for the trailing $match stage in '_resolvedPipeline'.
+ _resolvedPipeline.back() = matchStage;
}
- auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
+ auto pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
std::vector<Value> results;
int objsize = 0;
@@ -319,7 +338,7 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
if (wasConstructedWithPipelineSyntax()) {
auto matchObj = BSON("$match" << *_additionalFilter);
- _fromPipeline.push_back(matchObj);
+ _resolvedPipeline.push_back(matchObj);
}
return itr;
@@ -330,7 +349,7 @@ std::string DocumentSourceLookUp::getUserPipelineDefinition() {
return pipelineToString(_userPipeline);
}
- return _fromPipeline.back().toString();
+ return _resolvedPipeline.back().toString();
}
void DocumentSourceLookUp::doDispose() {
@@ -351,7 +370,7 @@ BSONObj DocumentSourceLookUp::makeMatchStageFromInput(const Document& input,
bool containsRegex = false;
document_path_support::visitAllValuesAtPath(input, localFieldPath, [&](const Value& nextValue) {
arrBuilder << nextValue;
- if (!containsRegex && nextValue.getType() == RegEx) {
+ if (!containsRegex && nextValue.getType() == BSONType::RegEx) {
containsRegex = true;
}
});
@@ -442,14 +461,17 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
BSONObj filter = _additionalFilter.value_or(BSONObj());
auto matchStage =
makeMatchStageFromInput(*_input, *_localField, _foreignField->fullPath(), filter);
- // We've already allocated space for the trailing $match stage in '_fromPipeline'.
- _fromPipeline.back() = matchStage;
+ // We've already allocated space for the trailing $match stage in '_resolvedPipeline'.
+ _resolvedPipeline.back() = matchStage;
}
if (_pipeline) {
_pipeline->dispose(pExpCtx->opCtx);
}
- _pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx));
+
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+ resolveLetVariables(*_input, &_fromExpCtx->variables);
+ _pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
// The $lookup stage takes responsibility for disposing of its Pipeline, since it will
// potentially be used by multiple OperationContexts, and the $lookup stage is part of an
@@ -489,14 +511,37 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
return output.freeze();
}
+void DocumentSourceLookUp::copyVariablesToExpCtx(const Variables& vars,
+ const VariablesParseState& vps,
+ ExpressionContext* expCtx) {
+ expCtx->variables = vars;
+ expCtx->variablesParseState = vps.copyWith(expCtx->variables.useIdGenerator());
+}
+
+void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variables* variables) {
+ invariant(variables);
+
+ for (auto& letVar : _letVariables) {
+ auto value = letVar.expression->evaluate(localDoc);
+ variables->setValue(letVar.id, value);
+ }
+}
+
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
if (wasConstructedWithPipelineSyntax()) {
+ MutableDocument exprList;
+ for (auto letVar : _letVariables) {
+ exprList.addField(letVar.name,
+ letVar.expression->serialize(static_cast<bool>(explain)));
+ }
+
doc = Document{{getSourceName(),
Document{{"from", _resolvedNs.coll()},
{"as", _as.fullPath()},
- {"pipeline", _fromPipeline}}}};
+ {"let", exprList.freeze()},
+ {"pipeline", _resolvedPipeline}}}};
} else {
doc = Document{{getSourceName(),
{Document{{"from", _fromNs.coll()},
@@ -543,8 +588,11 @@ void DocumentSourceLookUp::serializeToArray(
}
DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
- // As current pipeline syntax only supports non-correlated join, it precludes dependencies.
- if (!wasConstructedWithPipelineSyntax()) {
+ if (wasConstructedWithPipelineSyntax()) {
+ for (auto&& letVar : _letVariables) {
+ letVar.expression->addDependencies(deps);
+ }
+ } else {
deps->fields.insert(_localField->fullPath());
}
return SEE_NEXT;
@@ -556,7 +604,7 @@ void DocumentSourceLookUp::doDetachFromOperationContext() {
// use Pipeline::detachFromOperationContext() to take care of updating '_fromExpCtx->opCtx'.
_pipeline->detachFromOperationContext();
invariant(_fromExpCtx->opCtx == nullptr);
- } else {
+ } else if (_fromExpCtx) {
_fromExpCtx->opCtx = nullptr;
}
}
@@ -567,21 +615,27 @@ void DocumentSourceLookUp::doReattachToOperationContext(OperationContext* opCtx)
// use Pipeline::reattachToOperationContext() to take care of updating '_fromExpCtx->opCtx'.
_pipeline->reattachToOperationContext(opCtx);
invariant(_fromExpCtx->opCtx == opCtx);
- } else {
+ } else if (_fromExpCtx) {
_fromExpCtx->opCtx = opCtx;
}
}
intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
- uassert(4569, "the $lookup specification must be an Object", elem.type() == Object);
+ uassert(ErrorCodes::FailedToParse,
+ "the $lookup specification must be an Object",
+ elem.type() == BSONType::Object);
NamespaceString fromNs;
std::string as;
+
std::string localField;
std::string foreignField;
+
+ BSONObj letVariables;
std::vector<BSONObj> pipeline;
bool hasPipeline = false;
+ bool hasLet = false;
for (auto&& argument : elem.Obj()) {
const auto argName = argument.fieldNameStringData();
@@ -589,7 +643,7 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
if (argName == "pipeline") {
auto result = AggregationRequest::parsePipelineFromBSON(argument);
if (!result.isOK()) {
- uasserted(40447,
+ uasserted(ErrorCodes::FailedToParse,
str::stream() << "invalid $lookup pipeline definition: "
<< result.getStatus().toString());
}
@@ -598,10 +652,21 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
continue;
}
- uassert(4570,
- str::stream() << "$lookup '" << argument << "' must be a string, is type "
+ if (argName == "let") {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$lookup argument '" << argument
+ << "' must be an object, is type "
+ << argument.type(),
+ argument.type() == BSONType::Object);
+ letVariables = argument.Obj();
+ hasLet = true;
+ continue;
+ }
+
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$lookup argument '" << argument << "' must be a string, is type "
<< argument.type(),
- argument.type() == String);
+ argument.type() == BSONType::String);
if (argName == "from") {
fromNs = NamespaceString(pExpCtx->ns.db().toString() + '.' + argument.String());
@@ -612,26 +677,33 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
} else if (argName == "foreignField") {
foreignField = argument.String();
} else {
- uasserted(4571,
+ uasserted(ErrorCodes::FailedToParse,
str::stream() << "unknown argument to $lookup: " << argument.fieldName());
}
}
- uassert(40451, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
- uassert(40449, "must specify 'as' field for a $lookup", !as.empty());
+ uassert(
+ ErrorCodes::FailedToParse, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
+ uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty());
if (hasPipeline) {
- uassert(40450,
+ uassert(ErrorCodes::FailedToParse,
"$lookup with 'pipeline' may not specify 'localField' or 'foreignField'",
- localField.empty() || foreignField.empty());
+ localField.empty() && foreignField.empty());
- return new DocumentSourceLookUp(
- std::move(fromNs), std::move(as), std::move(pipeline), pExpCtx);
+ return new DocumentSourceLookUp(std::move(fromNs),
+ std::move(as),
+ std::move(pipeline),
+ std::move(letVariables),
+ pExpCtx);
} else {
- uassert(4572,
+ uassert(ErrorCodes::FailedToParse,
"$lookup requires either 'pipeline' or both 'localField' and 'foreignField' to be "
"specified",
!localField.empty() && !foreignField.empty());
+ uassert(ErrorCodes::FailedToParse,
+ "$lookup with a 'let' argument must also specify 'pipeline'",
+ !hasLet);
return new DocumentSourceLookUp(std::move(fromNs),
std::move(as),