summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_lookup.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-09-21 02:22:00 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2017-09-24 15:59:10 -0400
commit21f5028d4404ea9584d53bdb81dd63ca8a505d6f (patch)
treeec6e458b171de330c0e4756603ba95ba3df06834 /src/mongo/db/pipeline/document_source_lookup.cpp
parent280981d3a4cadeb91da9fd69864924e61d7ef99a (diff)
downloadmongo-21f5028d4404ea9584d53bdb81dd63ca8a505d6f.tar.gz
SERVER-30399 Add caching for $lookup non-correlated sub-pipeline prefix
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp79
1 files changed, 73 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index b4a41b1b2da..21f3151f59c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/query_knobs.h"
#include "mongo/stdx/memory.h"
namespace mongo {
@@ -106,6 +107,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
_userPipeline = std::move(pipeline);
+ _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load());
+
for (auto&& varElem : letVariables) {
const auto varName = varElem.fieldNameStringData();
Variables::uassertValidNameForUserWrite(varName);
@@ -203,8 +206,6 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
}
auto inputDoc = nextInput.releaseDocument();
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
- resolveLetVariables(inputDoc, &_fromExpCtx->variables);
// If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
// '_unwindSrc' would be non-null, and we would not have made it here.
@@ -217,7 +218,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
_resolvedPipeline.back() = matchStage;
}
- auto pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
+ auto pipeline = buildPipeline(inputDoc);
std::vector<Value> results;
int objsize = 0;
@@ -238,6 +239,53 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
return output.freeze();
}
+std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline(
+ const Document& inputDoc) {
+ // Copy all 'let' variables into the foreign pipeline's expression context.
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+
+ // Resolve the 'let' variables to values per the given input document.
+ resolveLetVariables(inputDoc, &_fromExpCtx->variables);
+
+ // If we don't have a cache, build and return the pipeline immediately.
+ if (!_cache || _cache->isAbandoned()) {
+ return uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
+ }
+
+ // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
+ // cursor source. If the cache is present and serving, then we will not be adding a cursor
+ // source later, so inject a mongod interface into all stages that need one.
+ MongodInterface::MakePipelineOptions pipelineOpts;
+
+ pipelineOpts.optimize = false;
+ pipelineOpts.attachCursorSource = false;
+ pipelineOpts.forceInjectMongod = _cache->isServing();
+
+ // Construct the basic pipeline without a cache stage.
+ auto pipeline =
+ uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
+
+ // Add the cache stage at the end and optimize. During the optimization process, the cache will
+ // either move itself to the correct position in the pipeline, or will abandon itself if no
+ // suitable cache position exists.
+ pipeline->addFinalSource(
+ DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr()));
+
+ pipeline->optimizePipeline();
+
+ if (!_cache->isServing()) {
+ // The cache has either been abandoned or has not yet been built. Attach a cursor.
+ uassertStatusOK(_mongod->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get()));
+ }
+
+ // If the cache has been abandoned, release it.
+ if (_cache->isAbandoned()) {
+ _cache.reset();
+ }
+
+ return pipeline;
+}
+
DocumentSource::GetModPathsReturn DocumentSourceLookUp::getModifiedPaths() const {
std::set<std::string> modifiedPaths{_as.fullPath()};
if (_unwindSrc) {
@@ -488,9 +536,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
_pipeline->dispose(pExpCtx->opCtx);
}
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
- resolveLetVariables(*_input, &_fromExpCtx->variables);
- _pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
+ _pipeline = buildPipeline(*_input);
// The $lookup stage takes responsibility for disposing of its Pipeline, since it will
// potentially be used by multiple OperationContexts, and the $lookup stage is part of an
@@ -613,9 +659,30 @@ void DocumentSourceLookUp::serializeToArray(
DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
if (wasConstructedWithPipelineSyntax()) {
+ // Copy all 'let' variables into the foreign pipeline's expression context.
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+
+ auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+
+ DepsTracker subDeps(deps->getMetadataAvailable());
+
+ // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
+ // declared by this $lookup and variables declared externally.
+ for (auto&& source : pipeline->getSources()) {
+ source->getDependencies(&subDeps);
+ }
+
+ // Add the 'let' dependencies to the tracker. Because the caller is only interested in
+ // references to external variables, filter out any subpipeline references to 'let'
+ // variables declared by this $lookup.
for (auto&& letVar : _letVariables) {
letVar.expression->addDependencies(deps);
+ subDeps.vars.erase(letVar.id);
}
+
+ // Add sub-pipeline variable dependencies. Do not add field dependencies, since these refer
+ // to the fields from the foreign collection rather than the local collection.
+ deps->vars.insert(subDeps.vars.begin(), subDeps.vars.end());
} else {
deps->fields.insert(_localField->fullPath());
}