diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-21 02:22:00 -0400 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-24 15:59:10 -0400 |
commit | 21f5028d4404ea9584d53bdb81dd63ca8a505d6f (patch) | |
tree | ec6e458b171de330c0e4756603ba95ba3df06834 /src/mongo/db/pipeline/document_source_lookup.cpp | |
parent | 280981d3a4cadeb91da9fd69864924e61d7ef99a (diff) | |
download | mongo-21f5028d4404ea9584d53bdb81dd63ca8a505d6f.tar.gz |
SERVER-30399 Add caching for $lookup non-correlated sub-pipeline prefix
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 79 |
1 files changed, 73 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index b4a41b1b2da..21f3151f59c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value.h" +#include "mongo/db/query/query_knobs.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -106,6 +107,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, _userPipeline = std::move(pipeline); + _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load()); + for (auto&& varElem : letVariables) { const auto varName = varElem.fieldNameStringData(); Variables::uassertValidNameForUserWrite(varName); @@ -203,8 +206,6 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { } auto inputDoc = nextInput.releaseDocument(); - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - resolveLetVariables(inputDoc, &_fromExpCtx->variables); // If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind, // '_unwindSrc' would be non-null, and we would not have made it here. @@ -217,7 +218,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { _resolvedPipeline.back() = matchStage; } - auto pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + auto pipeline = buildPipeline(inputDoc); std::vector<Value> results; int objsize = 0; @@ -238,6 +239,53 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { return output.freeze(); } +std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline( + const Document& inputDoc) { + // Copy all 'let' variables into the foreign pipeline's expression context. + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + + // Resolve the 'let' variables to values per the given input document. + resolveLetVariables(inputDoc, &_fromExpCtx->variables); + + // If we don't have a cache, build and return the pipeline immediately. + if (!_cache || _cache->isAbandoned()) { + return uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + } + + // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a + // cursor source. If the cache is present and serving, then we will not be adding a cursor + // source later, so inject a mongod interface into all stages that need one. + MongodInterface::MakePipelineOptions pipelineOpts; + + pipelineOpts.optimize = false; + pipelineOpts.attachCursorSource = false; + pipelineOpts.forceInjectMongod = _cache->isServing(); + + // Construct the basic pipeline without a cache stage. + auto pipeline = + uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + + // Add the cache stage at the end and optimize. During the optimization process, the cache will + // either move itself to the correct position in the pipeline, or will abandon itself if no + // suitable cache position exists. + pipeline->addFinalSource( + DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr())); + + pipeline->optimizePipeline(); + + if (!_cache->isServing()) { + // The cache has either been abandoned or has not yet been built. Attach a cursor. + uassertStatusOK(_mongod->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get())); + } + + // If the cache has been abandoned, release it. + if (_cache->isAbandoned()) { + _cache.reset(); + } + + return pipeline; +} + DocumentSource::GetModPathsReturn DocumentSourceLookUp::getModifiedPaths() const { std::set<std::string> modifiedPaths{_as.fullPath()}; if (_unwindSrc) { @@ -488,9 +536,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() { _pipeline->dispose(pExpCtx->opCtx); } - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - resolveLetVariables(*_input, &_fromExpCtx->variables); - _pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + _pipeline = buildPipeline(*_input); // The $lookup stage takes responsibility for disposing of its Pipeline, since it will // potentially be used by multiple OperationContexts, and the $lookup stage is part of an @@ -613,9 +659,30 @@ void DocumentSourceLookUp::serializeToArray( DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const { if (wasConstructedWithPipelineSyntax()) { + // Copy all 'let' variables into the foreign pipeline's expression context. + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + + auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + + DepsTracker subDeps(deps->getMetadataAvailable()); + + // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables + // declared by this $lookup and variables declared externally. + for (auto&& source : pipeline->getSources()) { + source->getDependencies(&subDeps); + } + + // Add the 'let' dependencies to the tracker. Because the caller is only interested in + // references to external variables, filter out any subpipeline references to 'let' + // variables declared by this $lookup. for (auto&& letVar : _letVariables) { letVar.expression->addDependencies(deps); + subDeps.vars.erase(letVar.id); } + + // Add sub-pipeline variable dependencies. Do not add field dependencies, since these refer + // to the fields from the foreign collection rather than the local collection. + deps->vars.insert(subDeps.vars.begin(), subDeps.vars.end()); } else { deps->fields.insert(_localField->fullPath()); } |