summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript4
-rw-r--r--src/mongo/db/pipeline/dependencies.h18
-rw-r--r--src/mongo/db/pipeline/document_source.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source.h37
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp79
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h37
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp419
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.cpp147
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h85
-rw-r--r--src/mongo/db/pipeline/expression.cpp44
-rw-r--r--src/mongo/db/pipeline/expression.h104
-rw-r--r--src/mongo/db/pipeline/expression_test.cpp62
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp30
-rw-r--r--src/mongo/db/pipeline/pipeline.h4
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp46
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h5
-rw-r--r--src/mongo/db/pipeline/sequential_document_cache.cpp88
-rw-r--r--src/mongo/db/pipeline/sequential_document_cache.h154
-rw-r--r--src/mongo/db/pipeline/sequential_document_cache_test.cpp163
-rw-r--r--src/mongo/db/pipeline/stub_mongod_interface.h8
-rw-r--r--src/mongo/db/pipeline/variables.cpp14
-rw-r--r--src/mongo/db/pipeline/variables.h16
27 files changed, 1540 insertions, 108 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index f735ce230b4..6266ae8a199 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -150,9 +150,11 @@ env.CppUnitTest(
'document_source_sort_test.cpp',
'document_source_test.cpp',
'document_source_unwind_test.cpp',
+ 'sequential_document_cache_test.cpp',
],
LIBDEPS=[
'document_source',
+ 'document_source_facet',
'document_source_lookup',
'document_value_test_util',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
@@ -255,11 +257,13 @@ docSourceEnv.Library(
'document_source_replace_root.cpp',
'document_source_sample.cpp',
'document_source_sample_from_random_cursor.cpp',
+ 'document_source_sequential_document_cache.cpp',
'document_source_single_document_transformation.cpp',
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
'document_source_unwind.cpp',
+ 'sequential_document_cache.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver',
diff --git a/src/mongo/db/pipeline/dependencies.h b/src/mongo/db/pipeline/dependencies.h
index 19cf3ee33f8..133fc1d9893 100644
--- a/src/mongo/db/pipeline/dependencies.h
+++ b/src/mongo/db/pipeline/dependencies.h
@@ -33,6 +33,7 @@
#include <string>
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/variables.h"
namespace mongo {
class ParsedDeps;
@@ -60,6 +61,16 @@ struct DepsTracker {
return fields.empty() && !needWholeDocument && !_needTextScore;
}
+ /**
+ * Returns 'true' if any of the DepsTracker's variables appear in the passed 'ids' set.
+ */
+ bool hasVariableReferenceTo(const std::set<Variables::Id>& ids) const {
+ std::vector<Variables::Id> match;
+ std::set_intersection(
+ vars.begin(), vars.end(), ids.begin(), ids.end(), std::back_inserter(match));
+ return !match.empty();
+ }
+
MetadataAvailable getMetadataAvailable() const {
return _metadataAvailable;
}
@@ -91,9 +102,10 @@ struct DepsTracker {
_needSortKey = needSortKey;
}
- std::set<std::string> fields; // The names of needed fields in dotted notation.
- bool needWholeDocument = false; // If true, ignore 'fields' and assume the whole document is
- // needed.
+ std::set<std::string> fields; // Names of needed fields in dotted notation.
+ std::set<Variables::Id> vars; // IDs of referenced variables.
+ bool needWholeDocument = false; // If true, ignore 'fields'; the whole document is needed.
+
private:
/**
* Appends the meta projections for the sort key and/or text score to 'bb' if necessary. Returns
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index a9d9f74181a..d51ad84ab67 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/value.h"
@@ -163,7 +164,15 @@ splitMatchByModifiedFields(const boost::intrusive_ptr<DocumentSourceMatch>& matc
Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
- invariant(*itr == this && (std::next(itr) != container->end()));
+ invariant(*itr == this);
+
+ // If we are at the end of the pipeline, only optimize in the special case of a cache stage.
+ if (std::next(itr) == container->end()) {
+ return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this)
+ ? doOptimizeAt(itr, container)
+ : container->end();
+ }
+
auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) {
// We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 462afbecb2f..eba4e2acc4b 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -568,6 +568,19 @@ public:
enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers };
enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps };
+ struct MakePipelineOptions {
+ MakePipelineOptions(){};
+
+ bool optimize = true;
+ bool attachCursorSource = true;
+
+ // Ordinarily, a MongodInterface is injected into the pipeline at the point when the
+ // cursor source is added. If true, 'forceInjectMongod' will inject MongodInterfaces
+ // into the pipeline even if 'attachCursorSource' is false. If 'attachCursorSource' is
+ // true, then the value of 'forceInjectMongod' is irrelevant.
+ bool forceInjectMongod = false;
+ };
+
virtual ~MongodInterface(){};
/**
@@ -633,16 +646,30 @@ public:
const std::list<BSONObj>& originalIndexes) = 0;
/**
- * Parses a Pipeline from a vector of BSONObjs representing DocumentSources and readies it
- * for execution. The returned pipeline is optimized and has a cursor source prepared.
+ * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of
+ * the returned pipeline will depend upon the supplied MakePipelineOptions:
+ * - The boolean opts.optimize determines whether the pipeline will be optimized.
+ * - If opts.attachCursorSource is false, the pipeline will be returned without attempting
+ * to add an initial cursor source.
+ * - If opts.forceInjectMongod is true, then a MongodInterface will be provided to each
+ * stage which requires one, regardless of whether a cursor source is attached to the
+ * pipeline.
*
* This function returns a non-OK status if parsing the pipeline failed.
- * NamespaceNotFound will be returned if ExpressionContext has a UUID and that UUID doesn't
- * exist anymore. That should be the only case where NamespaceNotFound gets returned.
*/
virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0;
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
+
+ /**
+ * Attaches a cursor source to the start of a pipeline. Performs no further optimization.
+ * This function asserts if the collection to be aggregated is sharded. NamespaceNotFound
+ * will be returned if ExpressionContext has a UUID and that UUID doesn't exist anymore.
+ * That should be the only case where NamespaceNotFound is returned.
+ */
+ virtual Status attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
* Returns a vector of owned BSONObjs, each of which contains details of an in-progress
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 133cffec12c..72e236ba870 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -236,18 +236,30 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) final {
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (!pipeline.isOK()) {
return pipeline.getStatus();
}
- pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults));
- pipeline.getValue()->optimizePipeline();
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ if (opts.attachCursorSource) {
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ }
return pipeline;
}
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
+ return Status::OK();
+ }
+
private:
deque<DocumentSource::GetNextResult> _mockResults;
};
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 3c8cf9db359..c7f82678eb1 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -256,15 +256,19 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
+ const bool scopeHasVariables = pExpCtx->variablesParseState.hasDefinedVariables();
for (auto&& facet : _facets) {
auto subDepsTracker = facet.pipeline->getDependencies(deps->getMetadataAvailable());
deps->fields.insert(subDepsTracker.fields.begin(), subDepsTracker.fields.end());
+ deps->vars.insert(subDepsTracker.vars.begin(), subDepsTracker.vars.end());
deps->needWholeDocument = deps->needWholeDocument || subDepsTracker.needWholeDocument;
deps->setNeedTextScore(deps->getNeedTextScore() || subDepsTracker.getNeedTextScore());
- if (deps->needWholeDocument && deps->getNeedTextScore()) {
+ // If there are variables defined at this stage's scope, there may be dependencies upon
+ // them in subsequent pipelines. Keep enumerating.
+ if (deps->needWholeDocument && deps->getNeedTextScore() && !scopeHasVariables) {
break;
}
}
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index feba9110eeb..8923aa1ab46 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -63,18 +63,30 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) final {
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (!pipeline.isOK()) {
return pipeline.getStatus();
}
- pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_results));
- pipeline.getValue()->optimizePipeline();
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ if (opts.attachCursorSource) {
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ }
return pipeline;
}
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) override {
+ pipeline->addInitialSource(DocumentSourceMock::create(_results));
+ return Status::OK();
+ }
+
private:
std::deque<DocumentSource::GetNextResult> _results;
};
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index b4a41b1b2da..21f3151f59c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/query_knobs.h"
#include "mongo/stdx/memory.h"
namespace mongo {
@@ -106,6 +107,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
_userPipeline = std::move(pipeline);
+ _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load());
+
for (auto&& varElem : letVariables) {
const auto varName = varElem.fieldNameStringData();
Variables::uassertValidNameForUserWrite(varName);
@@ -203,8 +206,6 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
}
auto inputDoc = nextInput.releaseDocument();
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
- resolveLetVariables(inputDoc, &_fromExpCtx->variables);
// If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind,
// '_unwindSrc' would be non-null, and we would not have made it here.
@@ -217,7 +218,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
_resolvedPipeline.back() = matchStage;
}
- auto pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
+ auto pipeline = buildPipeline(inputDoc);
std::vector<Value> results;
int objsize = 0;
@@ -238,6 +239,53 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() {
return output.freeze();
}
+std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline(
+ const Document& inputDoc) {
+ // Copy all 'let' variables into the foreign pipeline's expression context.
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+
+ // Resolve the 'let' variables to values per the given input document.
+ resolveLetVariables(inputDoc, &_fromExpCtx->variables);
+
+ // If we don't have a cache, build and return the pipeline immediately.
+ if (!_cache || _cache->isAbandoned()) {
+ return uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
+ }
+
+ // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
+ // cursor source. If the cache is present and serving, then we will not be adding a cursor
+ // source later, so inject a mongod interface into all stages that need one.
+ MongodInterface::MakePipelineOptions pipelineOpts;
+
+ pipelineOpts.optimize = false;
+ pipelineOpts.attachCursorSource = false;
+ pipelineOpts.forceInjectMongod = _cache->isServing();
+
+ // Construct the basic pipeline without a cache stage.
+ auto pipeline =
+ uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
+
+ // Add the cache stage at the end and optimize. During the optimization process, the cache will
+ // either move itself to the correct position in the pipeline, or will abandon itself if no
+ // suitable cache position exists.
+ pipeline->addFinalSource(
+ DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr()));
+
+ pipeline->optimizePipeline();
+
+ if (!_cache->isServing()) {
+ // The cache has either been abandoned or has not yet been built. Attach a cursor.
+ uassertStatusOK(_mongod->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get()));
+ }
+
+ // If the cache has been abandoned, release it.
+ if (_cache->isAbandoned()) {
+ _cache.reset();
+ }
+
+ return pipeline;
+}
+
DocumentSource::GetModPathsReturn DocumentSourceLookUp::getModifiedPaths() const {
std::set<std::string> modifiedPaths{_as.fullPath()};
if (_unwindSrc) {
@@ -488,9 +536,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() {
_pipeline->dispose(pExpCtx->opCtx);
}
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
- resolveLetVariables(*_input, &_fromExpCtx->variables);
- _pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx));
+ _pipeline = buildPipeline(*_input);
// The $lookup stage takes responsibility for disposing of its Pipeline, since it will
// potentially be used by multiple OperationContexts, and the $lookup stage is part of an
@@ -613,9 +659,30 @@ void DocumentSourceLookUp::serializeToArray(
DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
if (wasConstructedWithPipelineSyntax()) {
+ // Copy all 'let' variables into the foreign pipeline's expression context.
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+
+ auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+
+ DepsTracker subDeps(deps->getMetadataAvailable());
+
+ // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
+ // declared by this $lookup and variables declared externally.
+ for (auto&& source : pipeline->getSources()) {
+ source->getDependencies(&subDeps);
+ }
+
+ // Add the 'let' dependencies to the tracker. Because the caller is only interested in
+ // references to external variables, filter out any subpipeline references to 'let'
+ // variables declared by this $lookup.
for (auto&& letVar : _letVariables) {
letVar.expression->addDependencies(deps);
+ subDeps.vars.erase(letVar.id);
}
+
+ // Add sub-pipeline variable dependencies. Do not add field dependencies, since these refer
+ // to the fields from the foreign collection rather than the local collection.
+ deps->vars.insert(subDeps.vars.begin(), subDeps.vars.end());
} else {
deps->fields.insert(_localField->fullPath());
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 3645223bea7..9727d0e92db 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
@@ -126,6 +127,15 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ static boost::intrusive_ptr<DocumentSource> createFromBsonWithCacheSize(
+ BSONElement elem,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ size_t maxCacheSizeBytes) {
+ auto dsLookup = createFromBson(elem, pExpCtx);
+ static_cast<DocumentSourceLookUp*>(dsLookup.get())->reInitializeCache(maxCacheSizeBytes);
+ return dsLookup;
+ }
+
/**
* Builds the BSONObj used to query the foreign collection and wraps it in a $match.
*/
@@ -158,6 +168,10 @@ public:
return _variablesParseState;
}
+ std::unique_ptr<Pipeline, Pipeline::Deleter> getSubPipeline_forTest(const Document& inputDoc) {
+ return buildPipeline(inputDoc);
+ }
+
protected:
void doDispose() final;
@@ -229,11 +243,28 @@ private:
void resolveLetVariables(const Document& localDoc, Variables* variables);
/**
+ * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
+ * cursor and/or cache source as appropriate.
+ */
+ std::unique_ptr<Pipeline, Pipeline::Deleter> buildPipeline(const Document& inputDoc);
+
+ /**
* The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that
* is executed in that it will not include optimizations or resolved views.
*/
std::string getUserPipelineDefinition();
+ /**
+ * Reinitialize the cache with a new max size. May only be called if this DSLookup was created
+ * with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added
+ * to it.
+ */
+ void reInitializeCache(size_t maxCacheSizeBytes) {
+ invariant(wasConstructedWithPipelineSyntax());
+ invariant(!_cache || (_cache->isBuilding() && _cache->sizeBytes() == 0));
+ _cache.emplace(maxCacheSizeBytes);
+ }
+
NamespaceString _fromNs;
NamespaceString _resolvedNs;
FieldPath _as;
@@ -249,6 +280,12 @@ private:
Variables _variables;
VariablesParseState _variablesParseState;
+ // Caches documents returned by the non-correlated prefix of the $lookup pipeline during the
+ // first iteration, up to a specified size limit in bytes. If this limit is not exceeded by the
+ // time we hit EOF, subsequent iterations of the pipeline will draw from the cache rather than
+ // from a cursor source.
+ boost::optional<SequentialDocumentCache> _cache;
+
// The ExpressionContext used when performing aggregation pipelines against the '_resolvedNs'
// namespace.
boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 66c408aa0d2..d60e8b1a6e6 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -88,18 +88,30 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) final {
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (!pipeline.isOK()) {
return pipeline.getStatus();
}
- pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults));
- pipeline.getValue()->optimizePipeline();
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ if (opts.attachCursorSource) {
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ }
return pipeline;
}
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
+ return Status::OK();
+ }
+
private:
deque<DocumentSource::GetNextResult> _mockResults;
};
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 2f7ac7852e4..dcbc2b55814 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/stub_mongod_interface.h"
#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/query_knobs.h"
namespace mongo {
namespace {
@@ -53,6 +54,8 @@ using std::vector;
// This provides access to getExpCtx(), but we'll use a different name for this test suite.
using DocumentSourceLookUpTest = AggregationContextFixture;
+const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load();
+const auto kExplain = ExplainOptions::Verbosity::kQueryPlanner;
// A 'let' variable defined in a $lookup stage is expected to be available to all sub-pipelines. For
// sub-pipelines below the immediate one, they are passed to via ExpressionContext. This test
@@ -168,7 +171,6 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) {
ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax());
}
-
TEST_F(DocumentSourceLookUpTest, LiteParsedDocumentSourceLookupContainsExpectedNamespaces) {
auto stageSpec =
BSON("$lookup" << BSON("from"
@@ -407,12 +409,17 @@ TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) {
//
/**
- * A mock MongodInterface which allows mocking a foreign pipeline.
+ * A mock MongodInterface which allows mocking a foreign pipeline. If 'removeLeadingQueryStages' is
+ * true then any $match, $sort or $project fields at the start of the pipeline will be removed,
+ * simulating the pipeline changes which occur when PipelineD::prepareCursorSource absorbs stages
+ * into the PlanExecutor.
*/
class MockMongodInterface final : public StubMongodInterface {
public:
- MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults)
- : _mockResults(std::move(mockResults)) {}
+ MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults,
+ bool removeLeadingQueryStages = false)
+ : _mockResults(std::move(mockResults)),
+ _removeLeadingQueryStages(removeLeadingQueryStages) {}
bool isSharded(const NamespaceString& ns) final {
return false;
@@ -420,20 +427,42 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) final {
auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (!pipeline.isOK()) {
return pipeline.getStatus();
}
- pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults));
- pipeline.getValue()->optimizePipeline();
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ if (opts.attachCursorSource) {
+ uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ }
return pipeline;
}
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
+ if (pipeline->popFrontStageWithName("$match") ||
+ pipeline->popFrontStageWithName("$sort") ||
+ pipeline->popFrontStageWithName("$project")) {
+ continue;
+ }
+ break;
+ }
+
+ pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
+ return Status::OK();
+ }
+
private:
deque<DocumentSource::GetNextResult> _mockResults;
+ bool _removeLeadingQueryStages = false;
};
TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) {
@@ -590,5 +619,381 @@ TEST_F(DocumentSourceLookUpTest, LookupReportsFieldsModifiedByAbsorbedUnwind) {
lookup->dispose();
}
+BSONObj sequentialCacheStageObj(const StringData status = "kBuilding"_sd,
+ const long long maxSizeBytes = kDefaultMaxCacheSize) {
+ return BSON("$sequentialCache" << BSON("maxSizeBytes" << maxSizeBytes << "status" << status));
+}
+
+TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, "
+ "{$addFields: {varField: '$$var1'}}], from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ lookupStage->injectMongodInterface(
+ std::shared_ptr<MockMongodInterface>(new MockMongodInterface({})));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
+ ASSERT(subPipeline);
+
+ auto expectedPipe =
+ fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, "
+ << sequentialCacheStageObj()
+ << ", {$addFields: {varField: {$const: 5} }}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
+TEST_F(DocumentSourceLookUpTest,
+ ShouldDiscoverVariablesReferencedInFacetPipelineAfterAnExhaustiveAllStage) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // In the $facet stage here, the correlated $match stage comes after a $group stage which
+ // returns EXHAUSTIVE_ALL for its dependencies. Verify that we continue enumerating the $facet
+ // pipeline's variable dependencies after this point, so that the $facet stage is correctly
+ // identified as correlated and the cache is placed before it in the $lookup pipeline.
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, "
+ "{$facet: {facetPipe: [{$group: {_id: '$_id'}}, {$match: {$expr: {$eq: ['$_id', "
+ "'$$var1']}}}]}}], from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ lookupStage->injectMongodInterface(
+ std::shared_ptr<MockMongodInterface>(new MockMongodInterface({})));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
+ ASSERT(subPipeline);
+
+ // TODO SERVER-30991: $match within $facet should optimize to $const.
+ auto expectedPipe =
+ fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, "
+ << sequentialCacheStageObj()
+ << ", {$facet: {facetPipe: [{$group: {_id: '$_id'}}, {$match: "
+ "{$expr: {$eq: ['$_id', '$$var1']}}}]}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
+TEST_F(DocumentSourceLookUpTest,
+ ShouldIgnoreLocalVariablesShadowingLetVariablesWhenFindingNonCorrelatedPrefix) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // The $project stage defines a local variable with the same name as the $lookup 'let' variable.
+ // Verify that the $project is identified as non-correlated and the cache is placed after it.
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: 1}}, {$sort: {x: 1}}, "
+ "{$project: {_id: false, projectedField: {$let: {vars: {var1: 'abc'}, in: "
+ "'$$var1'}}}}, {$addFields: {varField: {$sum: ['$x', '$$var1']}}}], from: 'coll', "
+ "as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ lookupStage->injectMongodInterface(
+ std::shared_ptr<MockMongodInterface>(new MockMongodInterface({})));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
+ ASSERT(subPipeline);
+
+ auto expectedPipe = fromjson(
+ str::stream()
+ << "[{mock: {}}, {$match: {x: 1}}, {$sort: {sortKey: {x: 1}}}, {$project: {_id: false, "
+ "projectedField: {$let: {vars: {var1: {$const: 'abc'}}, in: '$$var1'}}}},"
+ << sequentialCacheStageObj()
+ << ", {$addFields: {varField: {$sum: ['$x', {$const: 5}]}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
+TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // Create a $lookup stage whose pipeline contains nested $lookups. The third-level $lookup
+ // refers to a 'let' variable defined in the top-level $lookup. Verify that the second-level
+ // $lookup is correctly identified as a correlated stage and the cache is placed before it.
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {from: 'coll', as: 'as', let: {var1: '$_id'}, pipeline: [{$match: "
+ "{x:1}}, {$sort: {x: 1}}, {$lookup: {from: 'coll', as: 'subas', pipeline: "
+ "[{$match: {x: 1}}, {$lookup: {from: 'coll', as: 'subsubas', pipeline: [{$match: "
+ "{$expr: {$eq: ['$y', '$$var1']}}}]}}]}}, {$addFields: {varField: '$$var1'}}]}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ lookupStage->injectMongodInterface(
+ std::shared_ptr<MockMongodInterface>(new MockMongodInterface({})));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
+ ASSERT(subPipeline);
+
+ auto expectedPipe =
+ fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, "
+ << sequentialCacheStageObj()
+ << ", {$lookup: {from: 'coll', as: 'subas', let: {}, pipeline: "
+ "[{$match: {x: 1}}, {$lookup: {from: 'coll', as: 'subsubas', "
+ "pipeline: [{$match: {$expr: {$eq: ['$y', '$$var1']}}}]}}]}}, "
+ "{$addFields: {varField: {$const: 5}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
+TEST_F(DocumentSourceLookUpTest,
+ ShouldIgnoreNestedLookupLetVariablesShadowingOuterLookupLetVariablesWhenFindingPrefix) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // The nested $lookup stage defines a 'let' variable with the same name as the top-level 'let'.
+ // Verify the nested $lookup is identified as non-correlated and the cache is placed after it.
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, "
+ "{$lookup: {let: {var1: '$y'}, pipeline: [{$match: {$expr: { $eq: ['$z', "
+ "'$$var1']}}}], from: 'coll', as: 'subas'}}, {$addFields: {varField: '$$var1'}}], "
+ "from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ lookupStage->injectMongodInterface(
+ std::shared_ptr<MockMongodInterface>(new MockMongodInterface({})));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
+ ASSERT(subPipeline);
+
+ auto expectedPipe =
+ fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, "
+ "{$lookup: {from: 'coll', as: 'subas', let: {var1: '$y'}, "
+ "pipeline: [{$match: {$expr: { $eq: ['$z', '$$var1']}}}]}}, "
+ << sequentialCacheStageObj()
+ << ", {$addFields: {varField: {$const: 5} }}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
+TEST_F(DocumentSourceLookUpTest, ShouldCacheEntirePipelineIfNonCorrelated) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {let: {}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, {$lookup: "
+ "{pipeline: [{$match: {y: 5}}], from: 'coll', as: 'subas'}}, {$addFields: "
+ "{constField: 5}}], from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ lookupStage->injectMongodInterface(
+ std::shared_ptr<MockMongodInterface>(new MockMongodInterface({})));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
+ ASSERT(subPipeline);
+
+ auto expectedPipe =
+ fromjson(str::stream()
+ << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, {$lookup: {from: "
+ "'coll', as: 'subas', let: {}, pipeline: [{$match: {y: 5}}]}}, {$addFields: "
+ "{constField: {$const: 5}}}, "
+ << sequentialCacheStageObj()
+ << "]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
+TEST_F(DocumentSourceLookUpTest,
+ ShouldReplaceNonCorrelatedPrefixWithCacheAfterFirstSubPipelineIteration) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson(
+ "{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: {$gte: 0}}}, {$sort: {x: "
+ "1}}, {$addFields: {varField: {$sum: ['$x', '$$var1']}}}], from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ // Prepare the mocked local and foreign sources.
+ auto mockLocalSource = DocumentSourceMock::create(
+ {Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}});
+
+ lookupStage->setSource(mockLocalSource.get());
+
+ deque<DocumentSource::GetNextResult> mockForeignContents{
+ Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}};
+
+ lookupStage->injectMongodInterface(std::make_shared<MockMongodInterface>(mockForeignContents));
+
+ // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields.
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
+ ASSERT(subPipeline);
+
+ auto expectedPipe = fromjson(
+ str::stream() << "[{mock: {}}, {$match: {x: {$gte: 0}}}, {$sort: {sortKey: {x: 1}}}, "
+ << sequentialCacheStageObj("kBuilding")
+ << ", {$addFields: {varField: {$sum: ['$x', {$const: 0}]}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+
+ // Verify the first result (non-cached) from the $lookup, for local document {_id: 0}.
+ auto nonCachedResult = lookupStage->getNext();
+ ASSERT(nonCachedResult.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ Document{fromjson(
+ "{_id: 0, as: [{x: 0, varField: 0}, {x: 1, varField: 1}, {x: 2, varField: 2}]}")},
+ nonCachedResult.getDocument());
+
+ // Preview the subpipeline that will be used to process the second local document {_id: 1}. The
+ // sub-pipeline cache has been built on the first iteration, and is now serving in place of the
+ // mocked foreign input source and the non-correlated stages at the start of the pipeline.
+ subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 1));
+ ASSERT(subPipeline);
+
+ expectedPipe =
+ fromjson(str::stream() << "[" << sequentialCacheStageObj("kServing")
+ << ", {$addFields: {varField: {$sum: ['$x', {$const: 1}]}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+
+ // Verify that the rest of the results are correctly constructed from the cache.
+ auto cachedResult = lookupStage->getNext();
+ ASSERT(cachedResult.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ Document{fromjson(
+ "{_id: 1, as: [{x: 0, varField: 1}, {x: 1, varField: 2}, {x: 2, varField: 3}]}")},
+ cachedResult.getDocument());
+
+ cachedResult = lookupStage->getNext();
+ ASSERT(cachedResult.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ Document{fromjson(
+ "{_id: 2, as: [{x: 0, varField: 2}, {x: 1, varField: 3}, {x: 2, varField: 4}]}")},
+ cachedResult.getDocument());
+}
+
+TEST_F(DocumentSourceLookUpTest,
+ ShouldAbandonCacheIfMaxSizeIsExceededAfterFirstSubPipelineIteration) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ // Ensure the cache is abandoned after the first iteration by setting its max size to 0.
+ size_t maxCacheSizeBytes = 0;
+ auto docSource = DocumentSourceLookUp::createFromBsonWithCacheSize(
+ fromjson(
+ "{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: {$gte: 0}}}, {$sort: {x: "
+ "1}}, {$addFields: {varField: {$sum: ['$x', '$$var1']}}}], from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx,
+ maxCacheSizeBytes);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ // Prepare the mocked local and foreign sources.
+ auto mockLocalSource = DocumentSourceMock::create({Document{{"_id", 0}}, Document{{"_id", 1}}});
+
+ lookupStage->setSource(mockLocalSource.get());
+
+ deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}},
+ Document{{"x", 1}}};
+
+ lookupStage->injectMongodInterface(std::make_shared<MockMongodInterface>(mockForeignContents));
+
+ // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields.
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
+ ASSERT(subPipeline);
+
+ auto expectedPipe = fromjson(
+ str::stream() << "[{mock: {}}, {$match: {x: {$gte: 0}}}, {$sort: {sortKey: {x: 1}}}, "
+ << sequentialCacheStageObj("kBuilding", 0ll)
+ << ", {$addFields: {varField: {$sum: ['$x', {$const: 0}]}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+
+ // Get the first result from the stage, for local document {_id: 0}.
+ auto firstResult = lookupStage->getNext();
+ ASSERT_DOCUMENT_EQ(
+ Document{fromjson("{_id: 0, as: [{x: 0, varField: 0}, {x: 1, varField: 1}]}")},
+ firstResult.getDocument());
+
+ // Preview the subpipeline that will be used to process the second local document {_id: 1}. The
+ // sub-pipeline cache exceeded its max size on the first iteration, was abandoned, and is now
+ // absent from the pipeline.
+ subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 1));
+ ASSERT(subPipeline);
+
+ expectedPipe = fromjson(str::stream()
+ << "[{mock: {}}, {$match: {x: {$gte: 0}}}, {$sort: {sortKey: {x: 1}}}, "
+ "{$addFields: {varField: {$sum: ['$x', {$const: 1}]}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+
+ // Verify that the second document is constructed correctly without the cache.
+ auto secondResult = lookupStage->getNext();
+
+ ASSERT_DOCUMENT_EQ(
+ Document{fromjson("{_id: 1, as: [{x: 0, varField: 1}, {x: 1, varField: 2}]}")},
+ secondResult.getDocument());
+}
+
+TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPlanExecutor) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {$expr: { $gte: ['$x', "
+ "'$$var1']}}}, {$sort: {x: 1}}, {$addFields: {varField: {$sum: ['$x', "
+ "'$$var1']}}}], from: 'coll', as: 'as'}}")
+ .firstElement(),
+ expCtx);
+
+ auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
+ ASSERT(lookupStage);
+
+ const bool removeLeadingQueryStages = true;
+
+ lookupStage->injectMongodInterface(std::shared_ptr<MockMongodInterface>(
+ new MockMongodInterface({}, removeLeadingQueryStages)));
+
+ auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
+ ASSERT(subPipeline);
+
+ auto expectedPipe =
+ fromjson("[{mock: {}}, {$addFields: {varField: {$sum: ['$x', {$const: 0}]}}}]");
+
+ ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe)));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index 84e35a01c82..f50faf85749 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -64,7 +64,14 @@ Value DocumentSourceMatch::serialize(boost::optional<ExplainOptions::Verbosity>
}
intrusive_ptr<DocumentSource> DocumentSourceMatch::optimize() {
- return getQuery().isEmpty() ? nullptr : this;
+ if (getQuery().isEmpty()) {
+ return nullptr;
+ }
+
+ // TODO SERVER-30991: thread optimization down to the MatchExpression.
+ //_expression->optimize();
+
+ return this;
}
DocumentSource::GetNextResult DocumentSourceMatch::getNext() {
@@ -471,6 +478,9 @@ BSONObj DocumentSourceMatch::getQuery() const {
}
DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker* deps) const {
+ // Get all field or variable dependencies.
+ _expression->addDependencies(deps);
+
if (isTextQuery()) {
// A $text aggregation field should return EXHAUSTIVE_FIELDS, since we don't necessarily
// know what field it will be searching without examining indices.
@@ -479,7 +489,6 @@ DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker*
return EXHAUSTIVE_FIELDS;
}
- _expression->addDependencies(deps);
return SEE_NEXT;
}
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
new file mode 100644
index 00000000000..7eb927399dd
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp
@@ -0,0 +1,147 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
+
+#include "mongo/db/pipeline/document_source_match.h"
+
+namespace mongo {
+
+constexpr StringData DocumentSourceSequentialDocumentCache::kStageName;
+
+DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, SequentialDocumentCache* cache)
+ : DocumentSource(expCtx), _cache(cache) {
+ invariant(_cache);
+ invariant(!_cache->isAbandoned());
+
+ if (_cache->isServing()) {
+ _cache->restartIteration();
+ }
+}
+
+DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() {
+ // Either we're reading from the cache, or we have an input source to build the cache from.
+ invariant(pSource || _cache->isServing());
+
+ pExpCtx->checkForInterrupt();
+
+ if (_cache->isServing()) {
+ auto nextDoc = _cache->getNext();
+ return (nextDoc ? std::move(*nextDoc) : GetNextResult::makeEOF());
+ }
+
+ auto nextResult = pSource->getNext();
+
+ if (!_cache->isAbandoned()) {
+ if (nextResult.isEOF()) {
+ _cache->freeze();
+ } else {
+ _cache->add(nextResult.getDocument());
+ }
+ }
+
+ return nextResult;
+}
+
+Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOptimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ // The DocumentSourceSequentialDocumentCache should always be the last stage in the pipeline
+ // pre-optimization. By the time optimization reaches this point, all preceding stages are in
+ // the final positions which they would have occupied if no cache stage was present.
+ invariant(_hasOptimizedPos || std::next(itr) == container->end());
+ invariant((*itr).get() == this);
+
+ // If we have already optimized our position, stay where we are.
+ if (_hasOptimizedPos) {
+ return std::next(itr);
+ }
+
+ // Mark this stage as having optimized itself.
+ _hasOptimizedPos = true;
+
+ // If the cache is the only stage in the pipeline, return immediately.
+ if (itr == container->begin()) {
+ return container->end();
+ }
+
+ // Pop the cache stage off the back of the pipeline.
+ auto cacheStage = std::move(*itr);
+ container->erase(itr);
+
+ // Get all variable IDs defined in this scope.
+ auto varIDs = pExpCtx->variablesParseState.getDefinedVariableIDs();
+
+ auto prefixSplit = container->begin();
+ DepsTracker deps;
+
+ // Iterate through the pipeline stages until we find one which references an external variable.
+ for (; prefixSplit != container->end(); ++prefixSplit) {
+ (*prefixSplit)->getDependencies(&deps);
+
+ if (deps.hasVariableReferenceTo(varIDs)) {
+ break;
+ }
+ }
+
+ // The 'prefixSplit' iterator is now pointing to the first stage of the correlated suffix. If
+ // the split point is the first stage, then the entire pipeline is correlated and we should not
+ // attempt to perform any caching. Abandon the cache and return.
+ if (prefixSplit == container->begin()) {
+ _cache->abandon();
+ return container->end();
+ }
+
+ // If the cache has been populated and is serving results, remove the non-correlated prefix.
+ if (_cache->isServing()) {
+ container->erase(container->begin(), prefixSplit);
+ }
+
+ container->insert(prefixSplit, std::move(cacheStage));
+
+ return container->end();
+}
+
+Value DocumentSourceSequentialDocumentCache::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ if (explain) {
+ return Value(Document{
+ {kStageName,
+ Document{{"maxSizeBytes"_sd, Value(static_cast<long long>(_cache->maxSizeBytes()))},
+ {"status"_sd,
+ _cache->isBuilding() ? "kBuilding"_sd : _cache->isServing()
+ ? "kServing"_sd
+ : "kAbandoned"_sd}}}});
+ }
+
+ return Value();
+}
+
+} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
new file mode 100644
index 00000000000..af96ae8e2ab
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/sequential_document_cache.h"
+
+namespace mongo {
+
+/**
+ * A DocumentSourceSequentialDocumentCache manages an underlying SequentialDocumentCache. If the
+ * cache's status is 'kBuilding', DocumentSourceSequentialDocumentCache will retrieve documents from
+ * the preceding pipeline stage, add them to the cache, and pass them through to the following
+ * pipeline stage. If the cache is in 'kServing' mode, DocumentSourceSequentialDocumentCache will
+ * return results directly from the cache rather than from a preceding stage. It does not have a
+ * registered parser and cannot be created from BSON.
+ */
+class DocumentSourceSequentialDocumentCache final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$sequentialCache"_sd;
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+ StageConstraints constraints() const {
+ StageConstraints constraints;
+
+ if (_cache->isServing()) {
+ constraints.requiredPosition = PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ }
+
+ return constraints;
+ }
+
+ GetNextResult getNext() final;
+
+ static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, SequentialDocumentCache* cache) {
+ return new DocumentSourceSequentialDocumentCache(pExpCtx, cache);
+ }
+
+protected:
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
+private:
+ DocumentSourceSequentialDocumentCache(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ SequentialDocumentCache* cache);
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ SequentialDocumentCache* _cache;
+
+ bool _hasOptimizedPos = false;
+};
+
+} // namesace mongo
diff --git a/src/mongo/db/pipeline/expression.cpp b/src/mongo/db/pipeline/expression.cpp
index c5877f5eee1..75b7ba50c53 100644
--- a/src/mongo/db/pipeline/expression.cpp
+++ b/src/mongo/db/pipeline/expression.cpp
@@ -684,7 +684,7 @@ intrusive_ptr<Expression> ExpressionCoerceToBool::optimize() {
return intrusive_ptr<Expression>(this);
}
-void ExpressionCoerceToBool::addDependencies(DepsTracker* deps) const {
+void ExpressionCoerceToBool::_doAddDependencies(DepsTracker* deps) const {
pExpression->addDependencies(deps);
}
@@ -942,7 +942,7 @@ intrusive_ptr<Expression> ExpressionConstant::optimize() {
return intrusive_ptr<Expression>(this);
}
-void ExpressionConstant::addDependencies(DepsTracker* deps) const {
+void ExpressionConstant::_doAddDependencies(DepsTracker* deps) const {
/* nothing to do */
}
@@ -1268,7 +1268,7 @@ Value ExpressionDateFromParts::evaluate(const Document& root) const {
MONGO_UNREACHABLE;
}
-void ExpressionDateFromParts::addDependencies(DepsTracker* deps) const {
+void ExpressionDateFromParts::_doAddDependencies(DepsTracker* deps) const {
if (_year) {
_year->addDependencies(deps);
}
@@ -1392,7 +1392,7 @@ Value ExpressionDateFromString::evaluate(const Document& root) const {
return Value(tzdb->fromString(dateTimeString, timeZone));
}
-void ExpressionDateFromString::addDependencies(DepsTracker* deps) const {
+void ExpressionDateFromString::_doAddDependencies(DepsTracker* deps) const {
_dateString->addDependencies(deps);
if (_timeZone) {
_timeZone->addDependencies(deps);
@@ -1532,7 +1532,7 @@ Value ExpressionDateToParts::evaluate(const Document& root) const {
}
}
-void ExpressionDateToParts::addDependencies(DepsTracker* deps) const {
+void ExpressionDateToParts::_doAddDependencies(DepsTracker* deps) const {
_date->addDependencies(deps);
if (_timeZone) {
_timeZone->addDependencies(deps);
@@ -1635,7 +1635,7 @@ Value ExpressionDateToString::evaluate(const Document& root) const {
return Value(timeZone->formatDate(_format, date.coerceToDate()));
}
-void ExpressionDateToString::addDependencies(DepsTracker* deps) const {
+void ExpressionDateToString::_doAddDependencies(DepsTracker* deps) const {
_date->addDependencies(deps);
if (_timeZone) {
_timeZone->addDependencies(deps);
@@ -1739,7 +1739,7 @@ intrusive_ptr<Expression> ExpressionObject::optimize() {
return this;
}
-void ExpressionObject::addDependencies(DepsTracker* deps) const {
+void ExpressionObject::_doAddDependencies(DepsTracker* deps) const {
for (auto&& pair : _expressions) {
pair.second->addDependencies(deps);
}
@@ -1834,13 +1834,15 @@ intrusive_ptr<Expression> ExpressionFieldPath::optimize() {
return intrusive_ptr<Expression>(this);
}
-void ExpressionFieldPath::addDependencies(DepsTracker* deps) const {
+void ExpressionFieldPath::_doAddDependencies(DepsTracker* deps) const {
if (_variable == Variables::kRootId) { // includes CURRENT when it is equivalent to ROOT.
if (_fieldPath.getPathLength() == 1) {
deps->needWholeDocument = true; // need full doc if just "$$ROOT"
} else {
deps->fields.insert(_fieldPath.tail().fullPath());
}
+ } else if (Variables::isUserDefinedVariable(_variable)) {
+ deps->vars.insert(_variable);
}
}
@@ -2041,7 +2043,7 @@ Value ExpressionFilter::evaluate(const Document& root) const {
return Value(std::move(output));
}
-void ExpressionFilter::addDependencies(DepsTracker* deps) const {
+void ExpressionFilter::_doAddDependencies(DepsTracker* deps) const {
_input->addDependencies(deps);
_filter->addDependencies(deps);
}
@@ -2129,7 +2131,6 @@ intrusive_ptr<Expression> ExpressionLet::optimize() {
it->second.expression = it->second.expression->optimize();
}
- // TODO be smarter with constant "variables"
_subExpression = _subExpression->optimize();
return this;
@@ -2157,17 +2158,16 @@ Value ExpressionLet::evaluate(const Document& root) const {
return _subExpression->evaluate(root);
}
-void ExpressionLet::addDependencies(DepsTracker* deps) const {
- for (VariableMap::const_iterator it = _variables.begin(), end = _variables.end(); it != end;
- ++it) {
- it->second.expression->addDependencies(deps);
+void ExpressionLet::_doAddDependencies(DepsTracker* deps) const {
+ for (auto&& idToNameExp : _variables) {
+ // Add the external dependencies from the 'vars' statement.
+ idToNameExp.second.expression->addDependencies(deps);
}
- // TODO be smarter when CURRENT is a bound variable
+ // Add subexpression dependencies, which may contain a mix of local and external variable refs.
_subExpression->addDependencies(deps);
}
-
/* ------------------------- ExpressionMap ----------------------------- */
REGISTER_EXPRESSION(map, ExpressionMap::parse);
@@ -2269,7 +2269,7 @@ Value ExpressionMap::evaluate(const Document& root) const {
return Value(std::move(output));
}
-void ExpressionMap::addDependencies(DepsTracker* deps) const {
+void ExpressionMap::_doAddDependencies(DepsTracker* deps) const {
_input->addDependencies(deps);
_each->addDependencies(deps);
}
@@ -2346,7 +2346,7 @@ Value ExpressionMeta::evaluate(const Document& root) const {
MONGO_UNREACHABLE;
}
-void ExpressionMeta::addDependencies(DepsTracker* deps) const {
+void ExpressionMeta::_doAddDependencies(DepsTracker* deps) const {
if (_metaType == MetaType::TEXT_SCORE) {
deps->setNeedTextScore(true);
}
@@ -2913,7 +2913,7 @@ intrusive_ptr<Expression> ExpressionNary::optimize() {
return this;
}
-void ExpressionNary::addDependencies(DepsTracker* deps) const {
+void ExpressionNary::_doAddDependencies(DepsTracker* deps) const {
for (auto&& operand : vpOperand) {
operand->addDependencies(deps);
}
@@ -3321,7 +3321,7 @@ intrusive_ptr<Expression> ExpressionReduce::optimize() {
return this;
}
-void ExpressionReduce::addDependencies(DepsTracker* deps) const {
+void ExpressionReduce::_doAddDependencies(DepsTracker* deps) const {
_input->addDependencies(deps);
_initial->addDependencies(deps);
_in->addDependencies(deps);
@@ -4155,7 +4155,7 @@ boost::intrusive_ptr<Expression> ExpressionSwitch::parse(
return expression;
}
-void ExpressionSwitch::addDependencies(DepsTracker* deps) const {
+void ExpressionSwitch::_doAddDependencies(DepsTracker* deps) const {
for (auto&& branch : _branches) {
branch.first->addDependencies(deps);
branch.second->addDependencies(deps);
@@ -4420,7 +4420,7 @@ Value ExpressionZip::serialize(bool explain) const {
<< serializedUseLongestLength)));
}
-void ExpressionZip::addDependencies(DepsTracker* deps) const {
+void ExpressionZip::_doAddDependencies(DepsTracker* deps) const {
std::for_each(
_inputs.begin(), _inputs.end(), [&deps](intrusive_ptr<Expression> inputExpression) -> void {
inputExpression->addDependencies(deps);
diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h
index ab82e3139e7..6532be87fb0 100644
--- a/src/mongo/db/pipeline/expression.h
+++ b/src/mongo/db/pipeline/expression.h
@@ -103,11 +103,17 @@ public:
}
/**
- * Add the fields used as input to this expression to 'deps'.
- *
- * Expressions are trees, so this is often recursive.
+ * Add the fields and variables used in this expression to 'deps'. References to variables which
+ * are local to a particular expression will be filtered out of the tracker upon return.
*/
- virtual void addDependencies(DepsTracker* deps) const = 0;
+ void addDependencies(DepsTracker* deps) {
+ _doAddDependencies(deps);
+
+ // Filter out references to any local variables.
+ if (_boundaryVariableId) {
+ deps->vars.erase(deps->vars.upper_bound(*_boundaryVariableId), deps->vars.end());
+ }
+ }
/**
* Serialize the Expression tree recursively.
@@ -205,7 +211,12 @@ public:
static void registerExpression(std::string key, Parser parser);
protected:
- Expression(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {}
+ Expression(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {
+ auto varIds = _expCtx->variablesParseState.getDefinedVariableIDs();
+ if (!varIds.empty()) {
+ _boundaryVariableId = *std::prev(varIds.end());
+ }
+ }
typedef std::vector<boost::intrusive_ptr<Expression>> ExpressionVector;
@@ -213,17 +224,18 @@ protected:
return _expCtx;
}
+ virtual void _doAddDependencies(DepsTracker* deps) const = 0;
+
private:
+ boost::optional<Variables::Id> _boundaryVariableId;
boost::intrusive_ptr<ExpressionContext> _expCtx;
};
-
-/// Inherit from ExpressionVariadic or ExpressionFixedArity instead of directly from this class.
+// Inherit from ExpressionVariadic or ExpressionFixedArity instead of directly from this class.
class ExpressionNary : public Expression {
public:
boost::intrusive_ptr<Expression> optimize() override;
Value serialize(bool explain) const override;
- void addDependencies(DepsTracker* deps) const override;
/*
Add an operand to the n-ary expression.
@@ -260,6 +272,8 @@ protected:
explicit ExpressionNary(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: Expression(expCtx) {}
+ void _doAddDependencies(DepsTracker* deps) const override;
+
ExpressionVector vpOperand;
};
@@ -418,7 +432,6 @@ public:
class ExpressionConstant final : public Expression {
public:
boost::intrusive_ptr<Expression> optimize() final;
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
Value serialize(bool explain) const final;
@@ -461,6 +474,9 @@ public:
return _value;
}
+protected:
+ void _doAddDependencies(DepsTracker* deps) const override;
+
private:
ExpressionConstant(const boost::intrusive_ptr<ExpressionContext>& expCtx, const Value& value);
@@ -504,13 +520,6 @@ public:
return evaluateDate(date, timeZone);
}
- void addDependencies(DepsTracker* deps) const final {
- _date->addDependencies(deps);
- if (_timeZone) {
- _timeZone->addDependencies(deps);
- }
- }
-
/**
* Always serializes to the full {date: <date arg>, timezone: <timezone arg>} format, leaving
* off the timezone if not specified.
@@ -594,6 +603,13 @@ protected:
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: Expression(expCtx), _opName(opName) {}
+ void _doAddDependencies(DepsTracker* deps) const final {
+ _date->addDependencies(deps);
+ if (_timeZone) {
+ _timeZone->addDependencies(deps);
+ }
+ }
+
/**
* Subclasses should implement this to do their actual date-related logic. Uses 'timezone' to
* evaluate the expression against 'data'. If the user did not specify a time zone, 'timezone'
@@ -729,7 +745,6 @@ public:
class ExpressionCoerceToBool final : public Expression {
public:
boost::intrusive_ptr<Expression> optimize() final;
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
Value serialize(bool explain) const final;
@@ -737,6 +752,9 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Expression>& pExpression);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionCoerceToBool(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Expression>& pExpression);
@@ -833,13 +851,15 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document&) const final;
- void addDependencies(DepsTracker*) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONElement expr,
const VariablesParseState& vps);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionDateFromString(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::intrusive_ptr<Expression> dateString,
@@ -854,13 +874,15 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONElement expr,
const VariablesParseState& vps);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionDateFromParts(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::intrusive_ptr<Expression> year,
@@ -905,13 +927,15 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONElement expr,
const VariablesParseState& vps);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
/**
* The iso8601 argument controls whether to output ISO8601 elements or natural calendar.
@@ -933,13 +957,15 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONElement expr,
const VariablesParseState& vps);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionDateToString(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const std::string& format, // The format string.
@@ -1007,7 +1033,6 @@ public:
class ExpressionFieldPath final : public Expression {
public:
boost::intrusive_ptr<Expression> optimize() final;
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
Value serialize(bool explain) const final;
@@ -1040,6 +1065,9 @@ public:
ComputedPaths getComputedPaths(const std::string& exprFieldPath,
Variables::Id renamingVar) const final;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionFieldPath(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const std::string& fieldPath,
@@ -1073,13 +1101,15 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONElement expr,
const VariablesParseState& vps);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::string varName,
@@ -1177,7 +1207,6 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -1195,6 +1224,9 @@ public:
typedef std::map<Variables::Id, NameAndExpression> VariableMap;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionLet(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const VariableMap& vars,
@@ -1236,7 +1268,6 @@ public:
boost::intrusive_ptr<Expression> optimize() final;
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -1246,6 +1277,9 @@ public:
ComputedPaths getComputedPaths(const std::string& exprFieldPath,
Variables::Id renamingVar) const final;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionMap(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -1264,13 +1298,15 @@ class ExpressionMeta final : public Expression {
public:
Value serialize(bool explain) const final;
Value evaluate(const Document& root) const final;
- void addDependencies(DepsTracker* deps) const final;
static boost::intrusive_ptr<Expression> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
BSONElement expr,
const VariablesParseState& vps);
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
enum MetaType {
TEXT_SCORE,
@@ -1364,7 +1400,6 @@ public:
class ExpressionObject final : public Expression {
public:
boost::intrusive_ptr<Expression> optimize() final;
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
Value serialize(bool explain) const final;
@@ -1391,6 +1426,9 @@ public:
ComputedPaths getComputedPaths(const std::string& exprFieldPath,
Variables::Id renamingVar) const final;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
ExpressionObject(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -1449,7 +1487,6 @@ public:
explicit ExpressionReduce(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: Expression(expCtx) {}
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
boost::intrusive_ptr<Expression> optimize() final;
static boost::intrusive_ptr<Expression> parse(
@@ -1458,6 +1495,9 @@ public:
const VariablesParseState& vpsIn);
Value serialize(bool explain) const final;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
boost::intrusive_ptr<Expression> _input;
boost::intrusive_ptr<Expression> _initial;
@@ -1676,7 +1716,6 @@ public:
explicit ExpressionSwitch(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: Expression(expCtx) {}
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
boost::intrusive_ptr<Expression> optimize() final;
static boost::intrusive_ptr<Expression> parse(
@@ -1685,6 +1724,9 @@ public:
const VariablesParseState& vpsIn);
Value serialize(bool explain) const final;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
using ExpressionPair =
std::pair<boost::intrusive_ptr<Expression>, boost::intrusive_ptr<Expression>>;
@@ -1795,7 +1837,6 @@ public:
explicit ExpressionZip(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: Expression(expCtx) {}
- void addDependencies(DepsTracker* deps) const final;
Value evaluate(const Document& root) const final;
boost::intrusive_ptr<Expression> optimize() final;
static boost::intrusive_ptr<Expression> parse(
@@ -1804,6 +1845,9 @@ public:
const VariablesParseState& vpsIn);
Value serialize(bool explain) const final;
+protected:
+ void _doAddDependencies(DepsTracker* deps) const final;
+
private:
bool _useLongestLength = false;
ExpressionVector _inputs;
diff --git a/src/mongo/db/pipeline/expression_test.cpp b/src/mongo/db/pipeline/expression_test.cpp
index bd3abd1ffc3..048dcbe769d 100644
--- a/src/mongo/db/pipeline/expression_test.cpp
+++ b/src/mongo/db/pipeline/expression_test.cpp
@@ -2663,6 +2663,68 @@ TEST(ExpressionObjectDependencies, FieldPathsShouldBeAddedToDependencies) {
ASSERT_EQ(deps.fields.count("c.d"), 1UL);
};
+TEST(ExpressionObjectDependencies, VariablesShouldBeAddedToDependencies) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ auto varID = expCtx->variablesParseState.defineVariable("var1");
+ auto fieldPath = ExpressionFieldPath::parse(expCtx, "$$var1", expCtx->variablesParseState);
+ DepsTracker deps;
+ fieldPath->addDependencies(&deps);
+ ASSERT_EQ(deps.vars.size(), 1UL);
+ ASSERT_EQ(deps.vars.count(varID), 1UL);
+}
+
+TEST(ExpressionObjectDependencies, LocalLetVariablesShouldBeFilteredOutOfDependencies) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ expCtx->variablesParseState.defineVariable("var1");
+ auto letSpec = BSON("$let" << BSON("vars" << BSON("var2"
+ << "abc")
+ << "in"
+ << BSON("$multiply" << BSON_ARRAY("$$var1"
+ << "$$var2"))));
+ auto expressionLet =
+ ExpressionLet::parse(expCtx, letSpec.firstElement(), expCtx->variablesParseState);
+ DepsTracker deps;
+ expressionLet->addDependencies(&deps);
+ ASSERT_EQ(deps.vars.size(), 1UL);
+ ASSERT_EQ(expCtx->variablesParseState.getVariable("var1"), *deps.vars.begin());
+}
+
+TEST(ExpressionObjectDependencies, LocalMapVariablesShouldBeFilteredOutOfDependencies) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ expCtx->variablesParseState.defineVariable("var1");
+ auto mapSpec = BSON("$map" << BSON("input"
+ << "$field1"
+ << "as"
+ << "var2"
+ << "in"
+ << BSON("$multiply" << BSON_ARRAY("$$var1"
+ << "$$var2"))));
+
+ auto expressionMap =
+ ExpressionMap::parse(expCtx, mapSpec.firstElement(), expCtx->variablesParseState);
+ DepsTracker deps;
+ expressionMap->addDependencies(&deps);
+ ASSERT_EQ(deps.vars.size(), 1UL);
+ ASSERT_EQ(expCtx->variablesParseState.getVariable("var1"), *deps.vars.begin());
+}
+
+TEST(ExpressionObjectDependencies, LocalFilterVariablesShouldBeFilteredOutOfDependencies) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ expCtx->variablesParseState.defineVariable("var1");
+ auto filterSpec = BSON("$filter" << BSON("input" << BSON_ARRAY(1 << 2 << 3) << "as"
+ << "var2"
+ << "cond"
+ << BSON("$gt" << BSON_ARRAY("$$var1"
+ << "$$var2"))));
+
+ auto expressionFilter =
+ ExpressionFilter::parse(expCtx, filterSpec.firstElement(), expCtx->variablesParseState);
+ DepsTracker deps;
+ expressionFilter->addDependencies(&deps);
+ ASSERT_EQ(deps.vars.size(), 1UL);
+ ASSERT_EQ(expCtx->variablesParseState.getVariable("var1"), *deps.vars.begin());
+}
+
//
// Optimizations.
//
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index d4a00168e3d..34e04912e49 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -217,7 +217,7 @@ void Pipeline::optimizePipeline() {
// We could be swapping around stages during this process, so disconnect the pipeline to prevent
// us from entering a state with dangling pointers.
unstitch();
- while (itr != _sources.end() && std::next(itr) != _sources.end()) {
+ while (itr != _sources.end()) {
invariant((*itr).get());
itr = (*itr).get()->optimizeAt(itr, &_sources);
}
@@ -508,19 +508,35 @@ void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) {
_sources.push_front(source);
}
+void Pipeline::addFinalSource(intrusive_ptr<DocumentSource> source) {
+ if (!_sources.empty()) {
+ source->setSource(_sources.back().get());
+ }
+ _sources.push_back(source);
+}
+
DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const {
DepsTracker deps(metadataAvailable);
+ const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables();
+ bool skipFieldsAndMetadataDeps = false;
bool knowAllFields = false;
bool knowAllMeta = false;
for (auto&& source : _sources) {
DepsTracker localDeps(deps.getMetadataAvailable());
DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
- if (status == DocumentSource::NOT_SUPPORTED) {
+ deps.vars.insert(localDeps.vars.begin(), localDeps.vars.end());
+
+ if ((skipFieldsAndMetadataDeps |= (status == DocumentSource::NOT_SUPPORTED))) {
// Assume this stage needs everything. We may still know something about our
- // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
- // EXHAUSTIVE_META.
- break;
+ // dependencies if an earlier stage returned EXHAUSTIVE_FIELDS or EXHAUSTIVE_META. If
+ // this scope has variables, we need to keep enumerating the remaining stages but will
+ // skip adding any further field or metadata dependencies.
+ if (scopeHasVariables) {
+ continue;
+ } else {
+ break;
+ }
}
if (!knowAllFields) {
@@ -540,7 +556,9 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva
knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
}
- if (knowAllMeta && knowAllFields) {
+ // If there are variables defined at this pipeline's scope, there may be dependencies upon
+ // them in subsequent stages. Keep enumerating.
+ if (knowAllMeta && knowAllFields && !scopeHasVariables) {
break;
}
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 5da1a018289..c5e94837019 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -245,9 +245,11 @@ public:
*/
std::vector<Value> serialize() const;
- /// The initial source is special since it varies between mongos and mongod.
+ // The initial source is special since it varies between mongos and mongod.
void addInitialSource(boost::intrusive_ptr<DocumentSource> source);
+ void addFinalSource(boost::intrusive_ptr<DocumentSource> source);
+
/**
* Returns the next result from the pipeline, or boost::none if there are no more results.
*/
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 33043acde9e..8f15e12e8bb 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -191,7 +191,8 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) final {
// 'expCtx' may represent the settings for an aggregation pipeline on a different namespace
// than the DocumentSource this MongodImplementation is injected into, but both
// ExpressionContext instances should still have the same OperationContext.
@@ -202,7 +203,26 @@ public:
return pipeline.getStatus();
}
- pipeline.getValue()->optimizePipeline();
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ Status cursorStatus = Status::OK();
+
+ if (opts.attachCursorSource) {
+ cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ } else if (opts.forceInjectMongod) {
+ PipelineD::injectMongodInterface(pipeline.getValue().get());
+ }
+
+ return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+ }
+
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ invariant(_ctx->opCtx == expCtx->opCtx);
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
boost::optional<AutoGetCollectionForReadCommand> autoColl;
if (expCtx->uuid) {
@@ -226,10 +246,9 @@ public:
auto css = CollectionShardingState::get(_ctx->opCtx, expCtx->ns);
uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata()));
- PipelineD::prepareCursorSource(
- autoColl->getCollection(), expCtx->ns, nullptr, pipeline.getValue().get());
+ PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
- return pipeline;
+ return Status::OK();
}
std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
@@ -460,6 +479,15 @@ BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) {
}
} // namespace
+void PipelineD::injectMongodInterface(Pipeline* pipeline) {
+ for (auto&& source : pipeline->_sources) {
+ if (auto needsMongod = dynamic_cast<DocumentSourceNeedsMongod*>(source.get())) {
+ needsMongod->injectMongodInterface(
+ std::make_shared<MongodImplementation>(pipeline->getContext()));
+ }
+ }
+}
+
void PipelineD::prepareCursorSource(Collection* collection,
const NamespaceString& nss,
const AggregationRequest* aggRequest,
@@ -470,13 +498,7 @@ void PipelineD::prepareCursorSource(Collection* collection,
Pipeline::SourceContainer& sources = pipeline->_sources;
// Inject a MongodImplementation to sources that need them.
- for (auto&& source : sources) {
- DocumentSourceNeedsMongod* needsMongod =
- dynamic_cast<DocumentSourceNeedsMongod*>(source.get());
- if (needsMongod) {
- needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(expCtx));
- }
- }
+ injectMongodInterface(pipeline);
if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) {
return;
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 3c1acaac998..0da6ebe2f20 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -81,6 +81,11 @@ public:
const AggregationRequest* aggRequest,
Pipeline* pipeline);
+ /**
+ * Injects a MongodInterface into stages which require access to mongod-specific functionality.
+ */
+ static void injectMongodInterface(Pipeline* pipeline);
+
static std::string getPlanSummaryStr(const Pipeline* pipeline);
static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut);
diff --git a/src/mongo/db/pipeline/sequential_document_cache.cpp b/src/mongo/db/pipeline/sequential_document_cache.cpp
new file mode 100644
index 00000000000..93d95c0e072
--- /dev/null
+++ b/src/mongo/db/pipeline/sequential_document_cache.cpp
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/sequential_document_cache.h"
+
+#include "mongo/base/error_codes.h"
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+void SequentialDocumentCache::add(Document doc) {
+ invariant(_status == CacheStatus::kBuilding);
+
+ if (checkCacheSize(doc) != CacheStatus::kAbandoned) {
+ _sizeBytes += doc.getApproximateSize();
+ _cache.push_back(std::move(doc));
+ }
+}
+
+void SequentialDocumentCache::freeze() {
+ invariant(_status == CacheStatus::kBuilding);
+
+ _status = CacheStatus::kServing;
+ _cache.shrink_to_fit();
+
+ _cacheIter = _cache.begin();
+}
+
+void SequentialDocumentCache::abandon() {
+ _status = CacheStatus::kAbandoned;
+
+ _cache.clear();
+ _cache.shrink_to_fit();
+
+ _cacheIter = _cache.begin();
+}
+
+boost::optional<Document> SequentialDocumentCache::getNext() {
+ invariant(_status == CacheStatus::kServing);
+
+ if (_cacheIter == _cache.end()) {
+ return boost::none;
+ }
+
+ return *_cacheIter++;
+}
+
+void SequentialDocumentCache::restartIteration() {
+ invariant(_status == CacheStatus::kServing);
+ _cacheIter = _cache.begin();
+}
+
+SequentialDocumentCache::CacheStatus SequentialDocumentCache::checkCacheSize(const Document& doc) {
+ if (_sizeBytes + doc.getApproximateSize() > _maxSizeBytes) {
+ abandon();
+ }
+
+ return _status;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/sequential_document_cache.h b/src/mongo/db/pipeline/sequential_document_cache.h
new file mode 100644
index 00000000000..625369d934c
--- /dev/null
+++ b/src/mongo/db/pipeline/sequential_document_cache.h
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional/optional.hpp>
+#include <stddef.h>
+#include <vector>
+
+#include "mongo/db/pipeline/document.h"
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+/**
+ * Implements a sequential cache of Documents, up to an optional maximum size. Can be in one of
+ * three states: building, serving, or abandoned. See SequentialDocumentCache::CacheStatus.
+ */
+class SequentialDocumentCache {
+ MONGO_DISALLOW_COPYING(SequentialDocumentCache);
+
+public:
+ explicit SequentialDocumentCache(size_t maxCacheSizeBytes) : _maxSizeBytes(maxCacheSizeBytes) {}
+
+ SequentialDocumentCache(SequentialDocumentCache&& moveFrom)
+ : _status(moveFrom._status),
+ _maxSizeBytes(moveFrom._maxSizeBytes),
+ _sizeBytes(moveFrom._sizeBytes),
+ _cacheIter(std::move(moveFrom._cacheIter)),
+ _cache(std::move(moveFrom._cache)) {}
+
+ SequentialDocumentCache& operator=(SequentialDocumentCache&& moveFrom) {
+ _cacheIter = std::move(moveFrom._cacheIter);
+ _maxSizeBytes = moveFrom._maxSizeBytes;
+ _cache = std::move(moveFrom._cache);
+ _sizeBytes = moveFrom._sizeBytes;
+ _status = moveFrom._status;
+
+ return *this;
+ }
+
+ /**
+ * Defines the states that the cache may be in at any given time.
+ */
+ enum class CacheStatus {
+ // The cache is being filled. More documents may be added. A newly instantiated cache is in
+ // this state by default.
+ kBuilding,
+
+ // The caller has invoked freeze() to indicate that no more Documents need to be added. The
+ // cache is read-only at this point.
+ kServing,
+
+ // The maximum permitted cache size has been exceeded, or the caller has explicitly
+ // abandoned the cache. Cannot add more documents or call getNext.
+ kAbandoned,
+ };
+
+ /**
+ * Adds a document to the back of the cache. May only be called while the cache is in
+ * 'kBuilding' mode.
+ */
+ void add(Document doc);
+
+ /**
+ * Moves the cache into 'kServing' (read-only) mode, and attempts to release any excess
+ * allocated memory. May only be called while the cache is in 'kBuilding' mode.
+ */
+ void freeze();
+
+ /**
+ * Abandons the cache, marking it as 'kAbandoned' and freeing any memory allocated while
+ * building.
+ */
+ void abandon();
+
+ /**
+ * Returns the next Document in sequence from the cache, or boost::none if the end of the cache
+ * has been reached. May only be called while in 'kServing' mode.
+ */
+ boost::optional<Document> getNext();
+
+ /**
+ * Resets the cache iterator to the beginning of the cache. May only be called while the cache
+ * is in 'kServing' mode.
+ */
+ void restartIteration();
+
+ CacheStatus status() const {
+ return _status;
+ }
+
+ size_t sizeBytes() const {
+ return _sizeBytes;
+ }
+
+ size_t maxSizeBytes() const {
+ return _maxSizeBytes;
+ }
+
+ size_t count() const {
+ return _cache.size();
+ }
+
+ bool isBuilding() const {
+ return _status == CacheStatus::kBuilding;
+ }
+
+ bool isServing() const {
+ return _status == CacheStatus::kServing;
+ }
+
+ bool isAbandoned() const {
+ return _status == CacheStatus::kAbandoned;
+ }
+
+private:
+ CacheStatus checkCacheSize(const Document& doc);
+
+ CacheStatus _status = CacheStatus::kBuilding;
+ size_t _maxSizeBytes = 0;
+ size_t _sizeBytes = 0;
+
+ std::vector<Document>::iterator _cacheIter;
+ std::vector<Document> _cache;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/sequential_document_cache_test.cpp b/src/mongo/db/pipeline/sequential_document_cache_test.cpp
new file mode 100644
index 00000000000..5372a80ecfe
--- /dev/null
+++ b/src/mongo/db/pipeline/sequential_document_cache_test.cpp
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/sequential_document_cache.h"
+
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+const size_t kCacheSizeBytes = 1024;
+
+TEST(SequentialDocumentCacheTest, CacheIsInBuildingModeUponInstantiation) {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+}
+
+TEST(SequentialDocumentCacheTest, CanAddDocumentsToCacheWhileBuilding) {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.add(DOC("_id" << 0));
+ cache.add(DOC("_id" << 1));
+
+ ASSERT_EQ(cache.count(), 2ul);
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotIterateCacheWhileBuilding, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.getNext();
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotRestartCacheIterationWhileBuilding, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.restartIteration();
+}
+
+TEST(SequentialDocumentCacheTest, CanIterateCacheAfterFreezing) {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.add(DOC("_id" << 0));
+ cache.add(DOC("_id" << 1));
+
+ ASSERT_EQ(cache.count(), 2ul);
+
+ cache.freeze();
+
+ ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 0));
+ ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 1));
+ ASSERT_FALSE(cache.getNext().is_initialized());
+}
+
+TEST(SequentialDocumentCacheTest, CanRestartCacheIterationAfterFreezing) {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.add(DOC("_id" << 0));
+ cache.add(DOC("_id" << 1));
+
+ ASSERT_EQ(cache.count(), 2ul);
+
+ cache.freeze();
+
+ ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 0));
+ ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 1));
+ ASSERT_FALSE(cache.getNext().is_initialized());
+
+ cache.restartIteration();
+
+ ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 0));
+ ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 1));
+ ASSERT_FALSE(cache.getNext().is_initialized());
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotAddDocumentsToCacheAfterFreezing, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ cache.freeze();
+
+ ASSERT(cache.isServing());
+
+ cache.add(DOC("_id" << 0));
+}
+
+TEST(SequentialDocumentCacheTest, ShouldAbandonCacheIfMaxSizeBytesExceeded) {
+ SequentialDocumentCache cache(0);
+ ASSERT(cache.isBuilding());
+
+ cache.add(DOC("_id" << 0));
+
+ ASSERT(cache.isAbandoned());
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotAddDocumentsToAbandonedCache, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ cache.abandon();
+
+ cache.add(DOC("_id" << 0));
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotFreezeCacheWhenAbandoned, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ cache.abandon();
+
+ cache.freeze();
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotRestartCacheIterationWhenAbandoned, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.abandon();
+
+ cache.restartIteration();
+}
+
+DEATH_TEST(SequentialDocumentCacheTest, CannotCallGetNextWhenAbandoned, "invariant") {
+ SequentialDocumentCache cache(kCacheSizeBytes);
+ ASSERT(cache.isBuilding());
+
+ cache.add(DOC("_id" << 0));
+ cache.add(DOC("_id" << 1));
+
+ cache.abandon();
+
+ cache.getNext();
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h
index 26127f3d241..f49211a13ac 100644
--- a/src/mongo/db/pipeline/stub_mongod_interface.h
+++ b/src/mongo/db/pipeline/stub_mongod_interface.h
@@ -93,7 +93,13 @@ public:
StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx) override {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/variables.cpp b/src/mongo/db/pipeline/variables.cpp
index ac713a37f5a..fc7983dbf5e 100644
--- a/src/mongo/db/pipeline/variables.cpp
+++ b/src/mongo/db/pipeline/variables.cpp
@@ -154,7 +154,9 @@ Variables::Id VariablesParseState::defineVariable(StringData name) {
Variables::kBuiltinVarNameToId.find(name) == Variables::kBuiltinVarNameToId.end());
Variables::Id id = _idGenerator->generateId();
- _variables[name] = id;
+ invariant(id > _lastSeen);
+
+ _variables[name] = _lastSeen = id;
return id;
}
@@ -176,4 +178,14 @@ Variables::Id VariablesParseState::getVariable(StringData name) const {
uassert(17276, str::stream() << "Use of undefined variable: " << name, name == "CURRENT");
return Variables::kRootId;
}
+
+std::set<Variables::Id> VariablesParseState::getDefinedVariableIDs() const {
+ std::set<Variables::Id> ids;
+
+ for (auto&& keyId : _variables) {
+ ids.insert(keyId.second);
+ }
+
+ return ids;
+}
}
diff --git a/src/mongo/db/pipeline/variables.h b/src/mongo/db/pipeline/variables.h
index 4e487e811f4..0d8dd403c36 100644
--- a/src/mongo/db/pipeline/variables.h
+++ b/src/mongo/db/pipeline/variables.h
@@ -45,7 +45,8 @@ public:
using Id = int64_t;
/**
- * Generates Variables::Id and keeps track of the number of Ids handed out.
+ * Generates Variables::Id and keeps track of the number of Ids handed out. Each successive Id
+ * generated by an instance of this class must be greater than all preceding Ids.
*/
class IdGenerator {
public:
@@ -141,11 +142,23 @@ public:
Variables::Id defineVariable(StringData name);
/**
+ * Returns true if there are any variables defined in this scope.
+ */
+ bool hasDefinedVariables() const {
+ return !_variables.empty();
+ }
+
+ /**
* Returns the current Id for a variable. uasserts if the variable isn't defined.
*/
Variables::Id getVariable(StringData name) const;
/**
+ * Returns the set of variable IDs defined at this scope.
+ */
+ std::set<Variables::Id> getDefinedVariableIDs() const;
+
+ /**
* Return a copy of this VariablesParseState. Will replace the copy's '_idGenerator' pointer
* with 'idGenerator'.
*/
@@ -160,6 +173,7 @@ private:
Variables::IdGenerator* _idGenerator;
StringMap<Variables::Id> _variables;
+ Variables::Id _lastSeen = -1;
};
} // namespace mongo