/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/lookup_set_cache.h"
#include "mongo/db/pipeline/value_comparator.h"
namespace mongo {
/**
* Queries separate collection for equality matches with documents in the pipeline collection.
* Adds matching documents to a new array field in the input document.
*/
class DocumentSourceLookUp final : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr size_t kMaxSubPipelineDepth = 20;
class LiteParsed final : public LiteParsedDocumentSource {
public:
static std::unique_ptr parse(const AggregationRequest& request,
const BSONElement& spec);
LiteParsed(NamespaceString fromNss,
stdx::unordered_set foreignNssSet,
boost::optional liteParsedPipeline)
: _fromNss{std::move(fromNss)},
_foreignNssSet(std::move(foreignNssSet)),
_liteParsedPipeline(std::move(liteParsedPipeline)) {}
stdx::unordered_set getInvolvedNamespaces() const final {
return {_foreignNssSet};
}
PrivilegeVector requiredPrivileges(bool isMongos) const final {
PrivilegeVector requiredPrivileges;
Privilege::addPrivilegeToPrivilegeVector(
&requiredPrivileges,
Privilege(ResourcePattern::forExactNamespace(_fromNss), ActionType::find));
if (_liteParsedPipeline) {
Privilege::addPrivilegesToPrivilegeVector(
&requiredPrivileges, _liteParsedPipeline->requiredPrivileges(isMongos));
}
return requiredPrivileges;
}
/**
* Lookup from a sharded collection is not allowed.
*/
bool allowShardedForeignCollection(NamespaceString nss) const final {
return (_foreignNssSet.find(nss) == _foreignNssSet.end());
}
private:
const NamespaceString _fromNss;
const stdx::unordered_set _foreignNssSet;
const boost::optional _liteParsedPipeline;
};
GetNextResult getNext() final;
const char* getSourceName() const final;
void serializeToArray(
std::vector& array,
boost::optional explain = boost::none) const final;
/**
* Returns the 'as' path, and possibly fields modified by an absorbed $unwind.
*/
GetModPathsReturn getModifiedPaths() const final;
/**
* Reports the StageConstraints of this $lookup instance. A $lookup constructed with pipeline
* syntax will inherit certain constraints from the stages in its pipeline.
*/
StageConstraints constraints(Pipeline::SplitState) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
BSONObjSet getOutputSorts() final {
return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
}
boost::intrusive_ptr getShardSource() final {
return nullptr;
}
MergingLogic mergingLogic() final {
return {this};
}
void addInvolvedCollections(std::vector* collections) const final {
collections->push_back(_fromNs);
}
void detachFromOperationContext() final;
void reattachToOperationContext(OperationContext* opCtx) final;
bool usedDisk() final;
static boost::intrusive_ptr createFromBson(
BSONElement elem, const boost::intrusive_ptr& pExpCtx);
static boost::intrusive_ptr createFromBsonWithCacheSize(
BSONElement elem,
const boost::intrusive_ptr& pExpCtx,
size_t maxCacheSizeBytes) {
auto dsLookup = createFromBson(elem, pExpCtx);
static_cast(dsLookup.get())->reInitializeCache(maxCacheSizeBytes);
return dsLookup;
}
/**
* Builds the BSONObj used to query the foreign collection and wraps it in a $match.
*/
static BSONObj makeMatchStageFromInput(const Document& input,
const FieldPath& localFieldName,
const std::string& foreignFieldName,
const BSONObj& additionalFilter);
/**
* Helper to absorb an $unwind stage. Only used for testing this special behavior.
*/
void setUnwindStage(const boost::intrusive_ptr& unwind) {
invariant(!_unwindSrc);
_unwindSrc = unwind;
}
/**
* Returns true if DocumentSourceLookup was constructed with pipeline syntax (as opposed to
* localField/foreignField syntax).
*/
bool wasConstructedWithPipelineSyntax() const {
return !static_cast(_localField);
}
const Variables& getVariables_forTest() {
return _variables;
}
const VariablesParseState& getVariablesParseState_forTest() {
return _variablesParseState;
}
std::unique_ptr getSubPipeline_forTest(const Document& inputDoc) {
return buildPipeline(inputDoc);
}
protected:
void doDispose() final;
/**
* Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc'
* field.
*/
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
private:
struct LetVariable {
LetVariable(std::string name, boost::intrusive_ptr expression, Variables::Id id)
: name(std::move(name)), expression(std::move(expression)), id(id) {}
std::string name;
boost::intrusive_ptr expression;
Variables::Id id;
};
/**
* Target constructor. Handles common-field initialization for the syntax-specific delegating
* constructors.
*/
DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr& pExpCtx);
/**
* Constructor used for a $lookup stage specified using the {from: ..., localField: ...,
* foreignField: ..., as: ...} syntax.
*/
DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::string localField,
std::string foreignField,
const boost::intrusive_ptr& pExpCtx);
/**
* Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as:
* ...} syntax.
*/
DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::vector pipeline,
BSONObj letVariables,
const boost::intrusive_ptr& pExpCtx);
/**
* Should not be called; use serializeToArray instead.
*/
Value serialize(boost::optional explain = boost::none) const final {
MONGO_UNREACHABLE;
}
GetNextResult unwindResult();
/**
* Copies 'vars' and 'vps' to the Variables and VariablesParseState objects in 'expCtx'. These
* copies provide access to 'let' defined variables in sub-pipeline execution.
*/
static void copyVariablesToExpCtx(const Variables& vars,
const VariablesParseState& vps,
ExpressionContext* expCtx);
/**
* Resolves let defined variables against 'localDoc' and stores the results in 'variables'.
*/
void resolveLetVariables(const Document& localDoc, Variables* variables);
/**
* Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup
* pipelines will be built recursively.
*/
void initializeIntrospectionPipeline();
/**
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
* cursor and/or cache source as appropriate.
*/
std::unique_ptr buildPipeline(const Document& inputDoc);
/**
* The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that
* is executed in that it will not include optimizations or resolved views.
*/
std::string getUserPipelineDefinition();
/**
* Reinitialize the cache with a new max size. May only be called if this DSLookup was created
* with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added
* to it.
*/
void reInitializeCache(size_t maxCacheSizeBytes) {
invariant(wasConstructedWithPipelineSyntax());
invariant(!_cache || (_cache->isBuilding() && _cache->sizeBytes() == 0));
_cache.emplace(maxCacheSizeBytes);
}
bool _usedDisk = false;
NamespaceString _fromNs;
NamespaceString _resolvedNs;
FieldPath _as;
boost::optional _additionalFilter;
// For use when $lookup is specified with localField/foreignField syntax.
boost::optional _localField;
boost::optional _foreignField;
// Holds 'let' defined variables defined both in this stage and in parent pipelines. These are
// copied to the '_fromExpCtx' ExpressionContext's 'variables' and 'variablesParseState' for use
// in foreign pipeline execution.
Variables _variables;
VariablesParseState _variablesParseState;
// Caches documents returned by the non-correlated prefix of the $lookup pipeline during the
// first iteration, up to a specified size limit in bytes. If this limit is not exceeded by the
// time we hit EOF, subsequent iterations of the pipeline will draw from the cache rather than
// from a cursor source.
boost::optional _cache;
// The ExpressionContext used when performing aggregation pipelines against the '_resolvedNs'
// namespace.
boost::intrusive_ptr _fromExpCtx;
// The aggregation pipeline to perform against the '_resolvedNs' namespace. Referenced view
// namespaces have been resolved.
std::vector _resolvedPipeline;
// The aggregation pipeline defined with the user request, prior to optimization and view
// resolution.
std::vector _userPipeline;
// A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective
// functions. If sub-$lookup stages are present, their pipelines are constructed recursively.
std::unique_ptr _parsedIntrospectionPipeline;
std::vector _letVariables;
boost::intrusive_ptr _matchSrc;
boost::intrusive_ptr _unwindSrc;
// The following members are used to hold onto state across getNext() calls when '_unwindSrc' is
// not null.
long long _cursorIndex = 0;
std::unique_ptr _pipeline;
boost::optional _input;
boost::optional _nextValue;
};
} // namespace mongo