diff options
Diffstat (limited to 'src/mongo/db/pipeline')
27 files changed, 1540 insertions, 108 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index f735ce230b4..6266ae8a199 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -150,9 +150,11 @@ env.CppUnitTest( 'document_source_sort_test.cpp', 'document_source_test.cpp', 'document_source_unwind_test.cpp', + 'sequential_document_cache_test.cpp', ], LIBDEPS=[ 'document_source', + 'document_source_facet', 'document_source_lookup', 'document_value_test_util', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', @@ -255,11 +257,13 @@ docSourceEnv.Library( 'document_source_replace_root.cpp', 'document_source_sample.cpp', 'document_source_sample_from_random_cursor.cpp', + 'document_source_sequential_document_cache.cpp', 'document_source_single_document_transformation.cpp', 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_sort_by_count.cpp', 'document_source_unwind.cpp', + 'sequential_document_cache.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver', diff --git a/src/mongo/db/pipeline/dependencies.h b/src/mongo/db/pipeline/dependencies.h index 19cf3ee33f8..133fc1d9893 100644 --- a/src/mongo/db/pipeline/dependencies.h +++ b/src/mongo/db/pipeline/dependencies.h @@ -33,6 +33,7 @@ #include <string> #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/variables.h" namespace mongo { class ParsedDeps; @@ -60,6 +61,16 @@ struct DepsTracker { return fields.empty() && !needWholeDocument && !_needTextScore; } + /** + * Returns 'true' if any of the DepsTracker's variables appear in the passed 'ids' set. + */ + bool hasVariableReferenceTo(const std::set<Variables::Id>& ids) const { + std::vector<Variables::Id> match; + std::set_intersection( + vars.begin(), vars.end(), ids.begin(), ids.end(), std::back_inserter(match)); + return !match.empty(); + } + MetadataAvailable getMetadataAvailable() const { return _metadataAvailable; } @@ -91,9 +102,10 @@ struct DepsTracker { _needSortKey = needSortKey; } - std::set<std::string> fields; // The names of needed fields in dotted notation. - bool needWholeDocument = false; // If true, ignore 'fields' and assume the whole document is - // needed. + std::set<std::string> fields; // Names of needed fields in dotted notation. + std::set<Variables::Id> vars; // IDs of referenced variables. + bool needWholeDocument = false; // If true, ignore 'fields'; the whole document is needed. + private: /** * Appends the meta projections for the sort key and/or text score to 'bb' if necessary. Returns diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index a9d9f74181a..d51ad84ab67 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -32,6 +32,7 @@ #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/value.h" @@ -163,7 +164,15 @@ splitMatchByModifiedFields(const boost::intrusive_ptr<DocumentSourceMatch>& matc Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { - invariant(*itr == this && (std::next(itr) != container->end())); + invariant(*itr == this); + + // If we are at the end of the pipeline, only optimize in the special case of a cache stage. + if (std::next(itr) == container->end()) { + return dynamic_cast<DocumentSourceSequentialDocumentCache*>(this) + ? doOptimizeAt(itr, container) + : container->end(); + } + auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) { // We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 462afbecb2f..eba4e2acc4b 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -568,6 +568,19 @@ public: enum class CurrentOpUserMode { kIncludeAll, kExcludeOthers }; enum class CurrentOpTruncateMode { kNoTruncation, kTruncateOps }; + struct MakePipelineOptions { + MakePipelineOptions(){}; + + bool optimize = true; + bool attachCursorSource = true; + + // Ordinarily, a MongodInterface is injected into the pipeline at the point when the + // cursor source is added. If true, 'forceInjectMongod' will inject MongodInterfaces + // into the pipeline even if 'attachCursorSource' is false. If 'attachCursorSource' is + // true, then the value of 'forceInjectMongod' is irrelevant. + bool forceInjectMongod = false; + }; + virtual ~MongodInterface(){}; /** @@ -633,16 +646,30 @@ public: const std::list<BSONObj>& originalIndexes) = 0; /** - * Parses a Pipeline from a vector of BSONObjs representing DocumentSources and readies it - * for execution. The returned pipeline is optimized and has a cursor source prepared. + * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of + * the returned pipeline will depend upon the supplied MakePipelineOptions: + * - The boolean opts.optimize determines whether the pipeline will be optimized. + * - If opts.attachCursorSource is false, the pipeline will be returned without attempting + * to add an initial cursor source. + * - If opts.forceInjectMongod is true, then a MongodInterface will be provided to each + * stage which requires one, regardless of whether a cursor source is attached to the + * pipeline. * * This function returns a non-OK status if parsing the pipeline failed. - * NamespaceNotFound will be returned if ExpressionContext has a UUID and that UUID doesn't - * exist anymore. That should be the only case where NamespaceNotFound gets returned. */ virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) = 0; + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts = MakePipelineOptions{}) = 0; + + /** + * Attaches a cursor source to the start of a pipeline. Performs no further optimization. + * This function asserts if the collection to be aggregated is sharded. NamespaceNotFound + * will be returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. + * That should be the only case where NamespaceNotFound is returned. + */ + virtual Status attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** * Returns a vector of owned BSONObjs, each of which contains details of an in-progress diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 133cffec12c..72e236ba870 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -236,18 +236,30 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (!pipeline.isOK()) { return pipeline.getStatus(); } - pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults)); - pipeline.getValue()->optimizePipeline(); + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + if (opts.attachCursorSource) { + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + } return pipeline; } + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); + return Status::OK(); + } + private: deque<DocumentSource::GetNextResult> _mockResults; }; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 3c8cf9db359..c7f82678eb1 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -256,15 +256,19 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { + const bool scopeHasVariables = pExpCtx->variablesParseState.hasDefinedVariables(); for (auto&& facet : _facets) { auto subDepsTracker = facet.pipeline->getDependencies(deps->getMetadataAvailable()); deps->fields.insert(subDepsTracker.fields.begin(), subDepsTracker.fields.end()); + deps->vars.insert(subDepsTracker.vars.begin(), subDepsTracker.vars.end()); deps->needWholeDocument = deps->needWholeDocument || subDepsTracker.needWholeDocument; deps->setNeedTextScore(deps->getNeedTextScore() || subDepsTracker.getNeedTextScore()); - if (deps->needWholeDocument && deps->getNeedTextScore()) { + // If there are variables defined at this stage's scope, there may be dependencies upon + // them in subsequent pipelines. Keep enumerating. + if (deps->needWholeDocument && deps->getNeedTextScore() && !scopeHasVariables) { break; } } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index feba9110eeb..8923aa1ab46 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -63,18 +63,30 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (!pipeline.isOK()) { return pipeline.getStatus(); } - pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_results)); - pipeline.getValue()->optimizePipeline(); + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + if (opts.attachCursorSource) { + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + } return pipeline; } + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) override { + pipeline->addInitialSource(DocumentSourceMock::create(_results)); + return Status::OK(); + } + private: std::deque<DocumentSource::GetNextResult> _results; }; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index b4a41b1b2da..21f3151f59c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -38,6 +38,7 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value.h" +#include "mongo/db/query/query_knobs.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -106,6 +107,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, _userPipeline = std::move(pipeline); + _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load()); + for (auto&& varElem : letVariables) { const auto varName = varElem.fieldNameStringData(); Variables::uassertValidNameForUserWrite(varName); @@ -203,8 +206,6 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { } auto inputDoc = nextInput.releaseDocument(); - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - resolveLetVariables(inputDoc, &_fromExpCtx->variables); // If we have not absorbed a $unwind, we cannot absorb a $match. If we have absorbed a $unwind, // '_unwindSrc' would be non-null, and we would not have made it here. @@ -217,7 +218,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { _resolvedPipeline.back() = matchStage; } - auto pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + auto pipeline = buildPipeline(inputDoc); std::vector<Value> results; int objsize = 0; @@ -238,6 +239,53 @@ DocumentSource::GetNextResult DocumentSourceLookUp::getNext() { return output.freeze(); } +std::unique_ptr<Pipeline, Pipeline::Deleter> DocumentSourceLookUp::buildPipeline( + const Document& inputDoc) { + // Copy all 'let' variables into the foreign pipeline's expression context. + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + + // Resolve the 'let' variables to values per the given input document. + resolveLetVariables(inputDoc, &_fromExpCtx->variables); + + // If we don't have a cache, build and return the pipeline immediately. + if (!_cache || _cache->isAbandoned()) { + return uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + } + + // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a + // cursor source. If the cache is present and serving, then we will not be adding a cursor + // source later, so inject a mongod interface into all stages that need one. + MongodInterface::MakePipelineOptions pipelineOpts; + + pipelineOpts.optimize = false; + pipelineOpts.attachCursorSource = false; + pipelineOpts.forceInjectMongod = _cache->isServing(); + + // Construct the basic pipeline without a cache stage. + auto pipeline = + uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + + // Add the cache stage at the end and optimize. During the optimization process, the cache will + // either move itself to the correct position in the pipeline, or will abandon itself if no + // suitable cache position exists. + pipeline->addFinalSource( + DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr())); + + pipeline->optimizePipeline(); + + if (!_cache->isServing()) { + // The cache has either been abandoned or has not yet been built. Attach a cursor. + uassertStatusOK(_mongod->attachCursorSourceToPipeline(_fromExpCtx, pipeline.get())); + } + + // If the cache has been abandoned, release it. + if (_cache->isAbandoned()) { + _cache.reset(); + } + + return pipeline; +} + DocumentSource::GetModPathsReturn DocumentSourceLookUp::getModifiedPaths() const { std::set<std::string> modifiedPaths{_as.fullPath()}; if (_unwindSrc) { @@ -488,9 +536,7 @@ DocumentSource::GetNextResult DocumentSourceLookUp::unwindResult() { _pipeline->dispose(pExpCtx->opCtx); } - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - resolveLetVariables(*_input, &_fromExpCtx->variables); - _pipeline = uassertStatusOK(_mongod->makePipeline(_resolvedPipeline, _fromExpCtx)); + _pipeline = buildPipeline(*_input); // The $lookup stage takes responsibility for disposing of its Pipeline, since it will // potentially be used by multiple OperationContexts, and the $lookup stage is part of an @@ -613,9 +659,30 @@ void DocumentSourceLookUp::serializeToArray( DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const { if (wasConstructedWithPipelineSyntax()) { + // Copy all 'let' variables into the foreign pipeline's expression context. + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + + auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + + DepsTracker subDeps(deps->getMetadataAvailable()); + + // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables + // declared by this $lookup and variables declared externally. + for (auto&& source : pipeline->getSources()) { + source->getDependencies(&subDeps); + } + + // Add the 'let' dependencies to the tracker. Because the caller is only interested in + // references to external variables, filter out any subpipeline references to 'let' + // variables declared by this $lookup. for (auto&& letVar : _letVariables) { letVar.expression->addDependencies(deps); + subDeps.vars.erase(letVar.id); } + + // Add sub-pipeline variable dependencies. Do not add field dependencies, since these refer + // to the fields from the foreign collection rather than the local collection. + deps->vars.insert(subDeps.vars.begin(), subDeps.vars.end()); } else { deps->fields.insert(_localField->fullPath()); } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 3645223bea7..9727d0e92db 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" @@ -126,6 +127,15 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + static boost::intrusive_ptr<DocumentSource> createFromBsonWithCacheSize( + BSONElement elem, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + size_t maxCacheSizeBytes) { + auto dsLookup = createFromBson(elem, pExpCtx); + static_cast<DocumentSourceLookUp*>(dsLookup.get())->reInitializeCache(maxCacheSizeBytes); + return dsLookup; + } + /** * Builds the BSONObj used to query the foreign collection and wraps it in a $match. */ @@ -158,6 +168,10 @@ public: return _variablesParseState; } + std::unique_ptr<Pipeline, Pipeline::Deleter> getSubPipeline_forTest(const Document& inputDoc) { + return buildPipeline(inputDoc); + } + protected: void doDispose() final; @@ -229,11 +243,28 @@ private: void resolveLetVariables(const Document& localDoc, Variables* variables); /** + * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a + * cursor and/or cache source as appropriate. + */ + std::unique_ptr<Pipeline, Pipeline::Deleter> buildPipeline(const Document& inputDoc); + + /** * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that * is executed in that it will not include optimizations or resolved views. */ std::string getUserPipelineDefinition(); + /** + * Reinitialize the cache with a new max size. May only be called if this DSLookup was created + * with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added + * to it. + */ + void reInitializeCache(size_t maxCacheSizeBytes) { + invariant(wasConstructedWithPipelineSyntax()); + invariant(!_cache || (_cache->isBuilding() && _cache->sizeBytes() == 0)); + _cache.emplace(maxCacheSizeBytes); + } + NamespaceString _fromNs; NamespaceString _resolvedNs; FieldPath _as; @@ -249,6 +280,12 @@ private: Variables _variables; VariablesParseState _variablesParseState; + // Caches documents returned by the non-correlated prefix of the $lookup pipeline during the + // first iteration, up to a specified size limit in bytes. If this limit is not exceeded by the + // time we hit EOF, subsequent iterations of the pipeline will draw from the cache rather than + // from a cursor source. + boost::optional<SequentialDocumentCache> _cache; + // The ExpressionContext used when performing aggregation pipelines against the '_resolvedNs' // namespace. boost::intrusive_ptr<ExpressionContext> _fromExpCtx; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 66c408aa0d2..d60e8b1a6e6 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -88,18 +88,30 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (!pipeline.isOK()) { return pipeline.getStatus(); } - pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults)); - pipeline.getValue()->optimizePipeline(); + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + if (opts.attachCursorSource) { + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + } return pipeline; } + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); + return Status::OK(); + } + private: deque<DocumentSource::GetNextResult> _mockResults; }; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 2f7ac7852e4..dcbc2b55814 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/stub_mongod_interface.h" #include "mongo/db/pipeline/value.h" +#include "mongo/db/query/query_knobs.h" namespace mongo { namespace { @@ -53,6 +54,8 @@ using std::vector; // This provides access to getExpCtx(), but we'll use a different name for this test suite. using DocumentSourceLookUpTest = AggregationContextFixture; +const long long kDefaultMaxCacheSize = internalDocumentSourceLookupCacheSizeBytes.load(); +const auto kExplain = ExplainOptions::Verbosity::kQueryPlanner; // A 'let' variable defined in a $lookup stage is expected to be available to all sub-pipelines. For // sub-pipelines below the immediate one, they are passed to via ExpressionContext. This test @@ -168,7 +171,6 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) { ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax()); } - TEST_F(DocumentSourceLookUpTest, LiteParsedDocumentSourceLookupContainsExpectedNamespaces) { auto stageSpec = BSON("$lookup" << BSON("from" @@ -407,12 +409,17 @@ TEST(MakeMatchStageFromInput, ArrayValueWithRegexUsesOrQuery) { // /** - * A mock MongodInterface which allows mocking a foreign pipeline. + * A mock MongodInterface which allows mocking a foreign pipeline. If 'removeLeadingQueryStages' is + * true then any $match, $sort or $project fields at the start of the pipeline will be removed, + * simulating the pipeline changes which occur when PipelineD::prepareCursorSource absorbs stages + * into the PlanExecutor. */ class MockMongodInterface final : public StubMongodInterface { public: - MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults) - : _mockResults(std::move(mockResults)) {} + MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults, + bool removeLeadingQueryStages = false) + : _mockResults(std::move(mockResults)), + _removeLeadingQueryStages(removeLeadingQueryStages) {} bool isSharded(const NamespaceString& ns) final { return false; @@ -420,20 +427,42 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (!pipeline.isOK()) { return pipeline.getStatus(); } - pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults)); - pipeline.getValue()->optimizePipeline(); + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + if (opts.attachCursorSource) { + uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + } return pipeline; } + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { + if (pipeline->popFrontStageWithName("$match") || + pipeline->popFrontStageWithName("$sort") || + pipeline->popFrontStageWithName("$project")) { + continue; + } + break; + } + + pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); + return Status::OK(); + } + private: deque<DocumentSource::GetNextResult> _mockResults; + bool _removeLeadingQueryStages = false; }; TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) { @@ -590,5 +619,381 @@ TEST_F(DocumentSourceLookUpTest, LookupReportsFieldsModifiedByAbsorbedUnwind) { lookup->dispose(); } +BSONObj sequentialCacheStageObj(const StringData status = "kBuilding"_sd, + const long long maxSizeBytes = kDefaultMaxCacheSize) { + return BSON("$sequentialCache" << BSON("maxSizeBytes" << maxSizeBytes << "status" << status)); +} + +TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, " + "{$addFields: {varField: '$$var1'}}], from: 'coll', as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + lookupStage->injectMongodInterface( + std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); + ASSERT(subPipeline); + + auto expectedPipe = + fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, " + << sequentialCacheStageObj() + << ", {$addFields: {varField: {$const: 5} }}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + +TEST_F(DocumentSourceLookUpTest, + ShouldDiscoverVariablesReferencedInFacetPipelineAfterAnExhaustiveAllStage) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // In the $facet stage here, the correlated $match stage comes after a $group stage which + // returns EXHAUSTIVE_ALL for its dependencies. Verify that we continue enumerating the $facet + // pipeline's variable dependencies after this point, so that the $facet stage is correctly + // identified as correlated and the cache is placed before it in the $lookup pipeline. + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, " + "{$facet: {facetPipe: [{$group: {_id: '$_id'}}, {$match: {$expr: {$eq: ['$_id', " + "'$$var1']}}}]}}], from: 'coll', as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + lookupStage->injectMongodInterface( + std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); + ASSERT(subPipeline); + + // TODO SERVER-30991: $match within $facet should optimize to $const. + auto expectedPipe = + fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, " + << sequentialCacheStageObj() + << ", {$facet: {facetPipe: [{$group: {_id: '$_id'}}, {$match: " + "{$expr: {$eq: ['$_id', '$$var1']}}}]}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + +TEST_F(DocumentSourceLookUpTest, + ShouldIgnoreLocalVariablesShadowingLetVariablesWhenFindingNonCorrelatedPrefix) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // The $project stage defines a local variable with the same name as the $lookup 'let' variable. + // Verify that the $project is identified as non-correlated and the cache is placed after it. + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: 1}}, {$sort: {x: 1}}, " + "{$project: {_id: false, projectedField: {$let: {vars: {var1: 'abc'}, in: " + "'$$var1'}}}}, {$addFields: {varField: {$sum: ['$x', '$$var1']}}}], from: 'coll', " + "as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + lookupStage->injectMongodInterface( + std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); + ASSERT(subPipeline); + + auto expectedPipe = fromjson( + str::stream() + << "[{mock: {}}, {$match: {x: 1}}, {$sort: {sortKey: {x: 1}}}, {$project: {_id: false, " + "projectedField: {$let: {vars: {var1: {$const: 'abc'}}, in: '$$var1'}}}}," + << sequentialCacheStageObj() + << ", {$addFields: {varField: {$sum: ['$x', {$const: 5}]}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + +TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // Create a $lookup stage whose pipeline contains nested $lookups. The third-level $lookup + // refers to a 'let' variable defined in the top-level $lookup. Verify that the second-level + // $lookup is correctly identified as a correlated stage and the cache is placed before it. + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {from: 'coll', as: 'as', let: {var1: '$_id'}, pipeline: [{$match: " + "{x:1}}, {$sort: {x: 1}}, {$lookup: {from: 'coll', as: 'subas', pipeline: " + "[{$match: {x: 1}}, {$lookup: {from: 'coll', as: 'subsubas', pipeline: [{$match: " + "{$expr: {$eq: ['$y', '$$var1']}}}]}}]}}, {$addFields: {varField: '$$var1'}}]}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + lookupStage->injectMongodInterface( + std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); + ASSERT(subPipeline); + + auto expectedPipe = + fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, " + << sequentialCacheStageObj() + << ", {$lookup: {from: 'coll', as: 'subas', let: {}, pipeline: " + "[{$match: {x: 1}}, {$lookup: {from: 'coll', as: 'subsubas', " + "pipeline: [{$match: {$expr: {$eq: ['$y', '$$var1']}}}]}}]}}, " + "{$addFields: {varField: {$const: 5}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + +TEST_F(DocumentSourceLookUpTest, + ShouldIgnoreNestedLookupLetVariablesShadowingOuterLookupLetVariablesWhenFindingPrefix) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // The nested $lookup stage defines a 'let' variable with the same name as the top-level 'let'. + // Verify the nested $lookup is identified as non-correlated and the cache is placed after it. + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, " + "{$lookup: {let: {var1: '$y'}, pipeline: [{$match: {$expr: { $eq: ['$z', " + "'$$var1']}}}], from: 'coll', as: 'subas'}}, {$addFields: {varField: '$$var1'}}], " + "from: 'coll', as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + lookupStage->injectMongodInterface( + std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); + ASSERT(subPipeline); + + auto expectedPipe = + fromjson(str::stream() << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, " + "{$lookup: {from: 'coll', as: 'subas', let: {var1: '$y'}, " + "pipeline: [{$match: {$expr: { $eq: ['$z', '$$var1']}}}]}}, " + << sequentialCacheStageObj() + << ", {$addFields: {varField: {$const: 5} }}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + +TEST_F(DocumentSourceLookUpTest, ShouldCacheEntirePipelineIfNonCorrelated) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {let: {}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, {$lookup: " + "{pipeline: [{$match: {y: 5}}], from: 'coll', as: 'subas'}}, {$addFields: " + "{constField: 5}}], from: 'coll', as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + lookupStage->injectMongodInterface( + std::shared_ptr<MockMongodInterface>(new MockMongodInterface({}))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); + ASSERT(subPipeline); + + auto expectedPipe = + fromjson(str::stream() + << "[{mock: {}}, {$match: {x:1}}, {$sort: {sortKey: {x: 1}}}, {$lookup: {from: " + "'coll', as: 'subas', let: {}, pipeline: [{$match: {y: 5}}]}}, {$addFields: " + "{constField: {$const: 5}}}, " + << sequentialCacheStageObj() + << "]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + +TEST_F(DocumentSourceLookUpTest, + ShouldReplaceNonCorrelatedPrefixWithCacheAfterFirstSubPipelineIteration) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson( + "{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: {$gte: 0}}}, {$sort: {x: " + "1}}, {$addFields: {varField: {$sum: ['$x', '$$var1']}}}], from: 'coll', as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + // Prepare the mocked local and foreign sources. + auto mockLocalSource = DocumentSourceMock::create( + {Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}); + + lookupStage->setSource(mockLocalSource.get()); + + deque<DocumentSource::GetNextResult> mockForeignContents{ + Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; + + lookupStage->injectMongodInterface(std::make_shared<MockMongodInterface>(mockForeignContents)); + + // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); + ASSERT(subPipeline); + + auto expectedPipe = fromjson( + str::stream() << "[{mock: {}}, {$match: {x: {$gte: 0}}}, {$sort: {sortKey: {x: 1}}}, " + << sequentialCacheStageObj("kBuilding") + << ", {$addFields: {varField: {$sum: ['$x', {$const: 0}]}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); + + // Verify the first result (non-cached) from the $lookup, for local document {_id: 0}. + auto nonCachedResult = lookupStage->getNext(); + ASSERT(nonCachedResult.isAdvanced()); + ASSERT_DOCUMENT_EQ( + Document{fromjson( + "{_id: 0, as: [{x: 0, varField: 0}, {x: 1, varField: 1}, {x: 2, varField: 2}]}")}, + nonCachedResult.getDocument()); + + // Preview the subpipeline that will be used to process the second local document {_id: 1}. The + // sub-pipeline cache has been built on the first iteration, and is now serving in place of the + // mocked foreign input source and the non-correlated stages at the start of the pipeline. + subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 1)); + ASSERT(subPipeline); + + expectedPipe = + fromjson(str::stream() << "[" << sequentialCacheStageObj("kServing") + << ", {$addFields: {varField: {$sum: ['$x', {$const: 1}]}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); + + // Verify that the rest of the results are correctly constructed from the cache. + auto cachedResult = lookupStage->getNext(); + ASSERT(cachedResult.isAdvanced()); + ASSERT_DOCUMENT_EQ( + Document{fromjson( + "{_id: 1, as: [{x: 0, varField: 1}, {x: 1, varField: 2}, {x: 2, varField: 3}]}")}, + cachedResult.getDocument()); + + cachedResult = lookupStage->getNext(); + ASSERT(cachedResult.isAdvanced()); + ASSERT_DOCUMENT_EQ( + Document{fromjson( + "{_id: 2, as: [{x: 0, varField: 2}, {x: 1, varField: 3}, {x: 2, varField: 4}]}")}, + cachedResult.getDocument()); +} + +TEST_F(DocumentSourceLookUpTest, + ShouldAbandonCacheIfMaxSizeIsExceededAfterFirstSubPipelineIteration) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + // Ensure the cache is abandoned after the first iteration by setting its max size to 0. + size_t maxCacheSizeBytes = 0; + auto docSource = DocumentSourceLookUp::createFromBsonWithCacheSize( + fromjson( + "{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: {$gte: 0}}}, {$sort: {x: " + "1}}, {$addFields: {varField: {$sum: ['$x', '$$var1']}}}], from: 'coll', as: 'as'}}") + .firstElement(), + expCtx, + maxCacheSizeBytes); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + // Prepare the mocked local and foreign sources. + auto mockLocalSource = DocumentSourceMock::create({Document{{"_id", 0}}, Document{{"_id", 1}}}); + + lookupStage->setSource(mockLocalSource.get()); + + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}}, + Document{{"x", 1}}}; + + lookupStage->injectMongodInterface(std::make_shared<MockMongodInterface>(mockForeignContents)); + + // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); + ASSERT(subPipeline); + + auto expectedPipe = fromjson( + str::stream() << "[{mock: {}}, {$match: {x: {$gte: 0}}}, {$sort: {sortKey: {x: 1}}}, " + << sequentialCacheStageObj("kBuilding", 0ll) + << ", {$addFields: {varField: {$sum: ['$x', {$const: 0}]}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); + + // Get the first result from the stage, for local document {_id: 0}. + auto firstResult = lookupStage->getNext(); + ASSERT_DOCUMENT_EQ( + Document{fromjson("{_id: 0, as: [{x: 0, varField: 0}, {x: 1, varField: 1}]}")}, + firstResult.getDocument()); + + // Preview the subpipeline that will be used to process the second local document {_id: 1}. The + // sub-pipeline cache exceeded its max size on the first iteration, was abandoned, and is now + // absent from the pipeline. + subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 1)); + ASSERT(subPipeline); + + expectedPipe = fromjson(str::stream() + << "[{mock: {}}, {$match: {x: {$gte: 0}}}, {$sort: {sortKey: {x: 1}}}, " + "{$addFields: {varField: {$sum: ['$x', {$const: 1}]}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); + + // Verify that the second document is constructed correctly without the cache. + auto secondResult = lookupStage->getNext(); + + ASSERT_DOCUMENT_EQ( + Document{fromjson("{_id: 1, as: [{x: 0, varField: 1}, {x: 1, varField: 2}]}")}, + secondResult.getDocument()); +} + +TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPlanExecutor) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = DocumentSourceLookUp::createFromBson( + fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {$expr: { $gte: ['$x', " + "'$$var1']}}}, {$sort: {x: 1}}, {$addFields: {varField: {$sum: ['$x', " + "'$$var1']}}}], from: 'coll', as: 'as'}}") + .firstElement(), + expCtx); + + auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); + ASSERT(lookupStage); + + const bool removeLeadingQueryStages = true; + + lookupStage->injectMongodInterface(std::shared_ptr<MockMongodInterface>( + new MockMongodInterface({}, removeLeadingQueryStages))); + + auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); + ASSERT(subPipeline); + + auto expectedPipe = + fromjson("[{mock: {}}, {$addFields: {varField: {$sum: ['$x', {$const: 0}]}}}]"); + + ASSERT_VALUE_EQ(Value(subPipeline->writeExplainOps(kExplain)), Value(BSONArray(expectedPipe))); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index 84e35a01c82..f50faf85749 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -64,7 +64,14 @@ Value DocumentSourceMatch::serialize(boost::optional<ExplainOptions::Verbosity> } intrusive_ptr<DocumentSource> DocumentSourceMatch::optimize() { - return getQuery().isEmpty() ? nullptr : this; + if (getQuery().isEmpty()) { + return nullptr; + } + + // TODO SERVER-30991: thread optimization down to the MatchExpression. + //_expression->optimize(); + + return this; } DocumentSource::GetNextResult DocumentSourceMatch::getNext() { @@ -471,6 +478,9 @@ BSONObj DocumentSourceMatch::getQuery() const { } DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker* deps) const { + // Get all field or variable dependencies. + _expression->addDependencies(deps); + if (isTextQuery()) { // A $text aggregation field should return EXHAUSTIVE_FIELDS, since we don't necessarily // know what field it will be searching without examining indices. @@ -479,7 +489,6 @@ DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker* return EXHAUSTIVE_FIELDS; } - _expression->addDependencies(deps); return SEE_NEXT; } diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp new file mode 100644 index 00000000000..7eb927399dd --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.cpp @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_sequential_document_cache.h" + +#include "mongo/db/pipeline/document_source_match.h" + +namespace mongo { + +constexpr StringData DocumentSourceSequentialDocumentCache::kStageName; + +DocumentSourceSequentialDocumentCache::DocumentSourceSequentialDocumentCache( + const boost::intrusive_ptr<ExpressionContext>& expCtx, SequentialDocumentCache* cache) + : DocumentSource(expCtx), _cache(cache) { + invariant(_cache); + invariant(!_cache->isAbandoned()); + + if (_cache->isServing()) { + _cache->restartIteration(); + } +} + +DocumentSource::GetNextResult DocumentSourceSequentialDocumentCache::getNext() { + // Either we're reading from the cache, or we have an input source to build the cache from. + invariant(pSource || _cache->isServing()); + + pExpCtx->checkForInterrupt(); + + if (_cache->isServing()) { + auto nextDoc = _cache->getNext(); + return (nextDoc ? std::move(*nextDoc) : GetNextResult::makeEOF()); + } + + auto nextResult = pSource->getNext(); + + if (!_cache->isAbandoned()) { + if (nextResult.isEOF()) { + _cache->freeze(); + } else { + _cache->add(nextResult.getDocument()); + } + } + + return nextResult; +} + +Pipeline::SourceContainer::iterator DocumentSourceSequentialDocumentCache::doOptimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + // The DocumentSourceSequentialDocumentCache should always be the last stage in the pipeline + // pre-optimization. By the time optimization reaches this point, all preceding stages are in + // the final positions which they would have occupied if no cache stage was present. + invariant(_hasOptimizedPos || std::next(itr) == container->end()); + invariant((*itr).get() == this); + + // If we have already optimized our position, stay where we are. + if (_hasOptimizedPos) { + return std::next(itr); + } + + // Mark this stage as having optimized itself. + _hasOptimizedPos = true; + + // If the cache is the only stage in the pipeline, return immediately. + if (itr == container->begin()) { + return container->end(); + } + + // Pop the cache stage off the back of the pipeline. + auto cacheStage = std::move(*itr); + container->erase(itr); + + // Get all variable IDs defined in this scope. + auto varIDs = pExpCtx->variablesParseState.getDefinedVariableIDs(); + + auto prefixSplit = container->begin(); + DepsTracker deps; + + // Iterate through the pipeline stages until we find one which references an external variable. + for (; prefixSplit != container->end(); ++prefixSplit) { + (*prefixSplit)->getDependencies(&deps); + + if (deps.hasVariableReferenceTo(varIDs)) { + break; + } + } + + // The 'prefixSplit' iterator is now pointing to the first stage of the correlated suffix. If + // the split point is the first stage, then the entire pipeline is correlated and we should not + // attempt to perform any caching. Abandon the cache and return. + if (prefixSplit == container->begin()) { + _cache->abandon(); + return container->end(); + } + + // If the cache has been populated and is serving results, remove the non-correlated prefix. + if (_cache->isServing()) { + container->erase(container->begin(), prefixSplit); + } + + container->insert(prefixSplit, std::move(cacheStage)); + + return container->end(); +} + +Value DocumentSourceSequentialDocumentCache::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + if (explain) { + return Value(Document{ + {kStageName, + Document{{"maxSizeBytes"_sd, Value(static_cast<long long>(_cache->maxSizeBytes()))}, + {"status"_sd, + _cache->isBuilding() ? "kBuilding"_sd : _cache->isServing() + ? "kServing"_sd + : "kAbandoned"_sd}}}}); + } + + return Value(); +} + +} // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h new file mode 100644 index 00000000000..af96ae8e2ab --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/sequential_document_cache.h" + +namespace mongo { + +/** + * A DocumentSourceSequentialDocumentCache manages an underlying SequentialDocumentCache. If the + * cache's status is 'kBuilding', DocumentSourceSequentialDocumentCache will retrieve documents from + * the preceding pipeline stage, add them to the cache, and pass them through to the following + * pipeline stage. If the cache is in 'kServing' mode, DocumentSourceSequentialDocumentCache will + * return results directly from the cache rather than from a preceding stage. It does not have a + * registered parser and cannot be created from BSON. + */ +class DocumentSourceSequentialDocumentCache final : public DocumentSource { +public: + static constexpr StringData kStageName = "$sequentialCache"_sd; + + const char* getSourceName() const final { + return kStageName.rawData(); + } + + StageConstraints constraints() const { + StageConstraints constraints; + + if (_cache->isServing()) { + constraints.requiredPosition = PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + } + + return constraints; + } + + GetNextResult getNext() final; + + static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, SequentialDocumentCache* cache) { + return new DocumentSourceSequentialDocumentCache(pExpCtx, cache); + } + +protected: + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + +private: + DocumentSourceSequentialDocumentCache(const boost::intrusive_ptr<ExpressionContext>& expCtx, + SequentialDocumentCache* cache); + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + SequentialDocumentCache* _cache; + + bool _hasOptimizedPos = false; +}; + +} // namesace mongo diff --git a/src/mongo/db/pipeline/expression.cpp b/src/mongo/db/pipeline/expression.cpp index c5877f5eee1..75b7ba50c53 100644 --- a/src/mongo/db/pipeline/expression.cpp +++ b/src/mongo/db/pipeline/expression.cpp @@ -684,7 +684,7 @@ intrusive_ptr<Expression> ExpressionCoerceToBool::optimize() { return intrusive_ptr<Expression>(this); } -void ExpressionCoerceToBool::addDependencies(DepsTracker* deps) const { +void ExpressionCoerceToBool::_doAddDependencies(DepsTracker* deps) const { pExpression->addDependencies(deps); } @@ -942,7 +942,7 @@ intrusive_ptr<Expression> ExpressionConstant::optimize() { return intrusive_ptr<Expression>(this); } -void ExpressionConstant::addDependencies(DepsTracker* deps) const { +void ExpressionConstant::_doAddDependencies(DepsTracker* deps) const { /* nothing to do */ } @@ -1268,7 +1268,7 @@ Value ExpressionDateFromParts::evaluate(const Document& root) const { MONGO_UNREACHABLE; } -void ExpressionDateFromParts::addDependencies(DepsTracker* deps) const { +void ExpressionDateFromParts::_doAddDependencies(DepsTracker* deps) const { if (_year) { _year->addDependencies(deps); } @@ -1392,7 +1392,7 @@ Value ExpressionDateFromString::evaluate(const Document& root) const { return Value(tzdb->fromString(dateTimeString, timeZone)); } -void ExpressionDateFromString::addDependencies(DepsTracker* deps) const { +void ExpressionDateFromString::_doAddDependencies(DepsTracker* deps) const { _dateString->addDependencies(deps); if (_timeZone) { _timeZone->addDependencies(deps); @@ -1532,7 +1532,7 @@ Value ExpressionDateToParts::evaluate(const Document& root) const { } } -void ExpressionDateToParts::addDependencies(DepsTracker* deps) const { +void ExpressionDateToParts::_doAddDependencies(DepsTracker* deps) const { _date->addDependencies(deps); if (_timeZone) { _timeZone->addDependencies(deps); @@ -1635,7 +1635,7 @@ Value ExpressionDateToString::evaluate(const Document& root) const { return Value(timeZone->formatDate(_format, date.coerceToDate())); } -void ExpressionDateToString::addDependencies(DepsTracker* deps) const { +void ExpressionDateToString::_doAddDependencies(DepsTracker* deps) const { _date->addDependencies(deps); if (_timeZone) { _timeZone->addDependencies(deps); @@ -1739,7 +1739,7 @@ intrusive_ptr<Expression> ExpressionObject::optimize() { return this; } -void ExpressionObject::addDependencies(DepsTracker* deps) const { +void ExpressionObject::_doAddDependencies(DepsTracker* deps) const { for (auto&& pair : _expressions) { pair.second->addDependencies(deps); } @@ -1834,13 +1834,15 @@ intrusive_ptr<Expression> ExpressionFieldPath::optimize() { return intrusive_ptr<Expression>(this); } -void ExpressionFieldPath::addDependencies(DepsTracker* deps) const { +void ExpressionFieldPath::_doAddDependencies(DepsTracker* deps) const { if (_variable == Variables::kRootId) { // includes CURRENT when it is equivalent to ROOT. if (_fieldPath.getPathLength() == 1) { deps->needWholeDocument = true; // need full doc if just "$$ROOT" } else { deps->fields.insert(_fieldPath.tail().fullPath()); } + } else if (Variables::isUserDefinedVariable(_variable)) { + deps->vars.insert(_variable); } } @@ -2041,7 +2043,7 @@ Value ExpressionFilter::evaluate(const Document& root) const { return Value(std::move(output)); } -void ExpressionFilter::addDependencies(DepsTracker* deps) const { +void ExpressionFilter::_doAddDependencies(DepsTracker* deps) const { _input->addDependencies(deps); _filter->addDependencies(deps); } @@ -2129,7 +2131,6 @@ intrusive_ptr<Expression> ExpressionLet::optimize() { it->second.expression = it->second.expression->optimize(); } - // TODO be smarter with constant "variables" _subExpression = _subExpression->optimize(); return this; @@ -2157,17 +2158,16 @@ Value ExpressionLet::evaluate(const Document& root) const { return _subExpression->evaluate(root); } -void ExpressionLet::addDependencies(DepsTracker* deps) const { - for (VariableMap::const_iterator it = _variables.begin(), end = _variables.end(); it != end; - ++it) { - it->second.expression->addDependencies(deps); +void ExpressionLet::_doAddDependencies(DepsTracker* deps) const { + for (auto&& idToNameExp : _variables) { + // Add the external dependencies from the 'vars' statement. + idToNameExp.second.expression->addDependencies(deps); } - // TODO be smarter when CURRENT is a bound variable + // Add subexpression dependencies, which may contain a mix of local and external variable refs. _subExpression->addDependencies(deps); } - /* ------------------------- ExpressionMap ----------------------------- */ REGISTER_EXPRESSION(map, ExpressionMap::parse); @@ -2269,7 +2269,7 @@ Value ExpressionMap::evaluate(const Document& root) const { return Value(std::move(output)); } -void ExpressionMap::addDependencies(DepsTracker* deps) const { +void ExpressionMap::_doAddDependencies(DepsTracker* deps) const { _input->addDependencies(deps); _each->addDependencies(deps); } @@ -2346,7 +2346,7 @@ Value ExpressionMeta::evaluate(const Document& root) const { MONGO_UNREACHABLE; } -void ExpressionMeta::addDependencies(DepsTracker* deps) const { +void ExpressionMeta::_doAddDependencies(DepsTracker* deps) const { if (_metaType == MetaType::TEXT_SCORE) { deps->setNeedTextScore(true); } @@ -2913,7 +2913,7 @@ intrusive_ptr<Expression> ExpressionNary::optimize() { return this; } -void ExpressionNary::addDependencies(DepsTracker* deps) const { +void ExpressionNary::_doAddDependencies(DepsTracker* deps) const { for (auto&& operand : vpOperand) { operand->addDependencies(deps); } @@ -3321,7 +3321,7 @@ intrusive_ptr<Expression> ExpressionReduce::optimize() { return this; } -void ExpressionReduce::addDependencies(DepsTracker* deps) const { +void ExpressionReduce::_doAddDependencies(DepsTracker* deps) const { _input->addDependencies(deps); _initial->addDependencies(deps); _in->addDependencies(deps); @@ -4155,7 +4155,7 @@ boost::intrusive_ptr<Expression> ExpressionSwitch::parse( return expression; } -void ExpressionSwitch::addDependencies(DepsTracker* deps) const { +void ExpressionSwitch::_doAddDependencies(DepsTracker* deps) const { for (auto&& branch : _branches) { branch.first->addDependencies(deps); branch.second->addDependencies(deps); @@ -4420,7 +4420,7 @@ Value ExpressionZip::serialize(bool explain) const { << serializedUseLongestLength))); } -void ExpressionZip::addDependencies(DepsTracker* deps) const { +void ExpressionZip::_doAddDependencies(DepsTracker* deps) const { std::for_each( _inputs.begin(), _inputs.end(), [&deps](intrusive_ptr<Expression> inputExpression) -> void { inputExpression->addDependencies(deps); diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h index ab82e3139e7..6532be87fb0 100644 --- a/src/mongo/db/pipeline/expression.h +++ b/src/mongo/db/pipeline/expression.h @@ -103,11 +103,17 @@ public: } /** - * Add the fields used as input to this expression to 'deps'. - * - * Expressions are trees, so this is often recursive. + * Add the fields and variables used in this expression to 'deps'. References to variables which + * are local to a particular expression will be filtered out of the tracker upon return. */ - virtual void addDependencies(DepsTracker* deps) const = 0; + void addDependencies(DepsTracker* deps) { + _doAddDependencies(deps); + + // Filter out references to any local variables. + if (_boundaryVariableId) { + deps->vars.erase(deps->vars.upper_bound(*_boundaryVariableId), deps->vars.end()); + } + } /** * Serialize the Expression tree recursively. @@ -205,7 +211,12 @@ public: static void registerExpression(std::string key, Parser parser); protected: - Expression(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {} + Expression(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) { + auto varIds = _expCtx->variablesParseState.getDefinedVariableIDs(); + if (!varIds.empty()) { + _boundaryVariableId = *std::prev(varIds.end()); + } + } typedef std::vector<boost::intrusive_ptr<Expression>> ExpressionVector; @@ -213,17 +224,18 @@ protected: return _expCtx; } + virtual void _doAddDependencies(DepsTracker* deps) const = 0; + private: + boost::optional<Variables::Id> _boundaryVariableId; boost::intrusive_ptr<ExpressionContext> _expCtx; }; - -/// Inherit from ExpressionVariadic or ExpressionFixedArity instead of directly from this class. +// Inherit from ExpressionVariadic or ExpressionFixedArity instead of directly from this class. class ExpressionNary : public Expression { public: boost::intrusive_ptr<Expression> optimize() override; Value serialize(bool explain) const override; - void addDependencies(DepsTracker* deps) const override; /* Add an operand to the n-ary expression. @@ -260,6 +272,8 @@ protected: explicit ExpressionNary(const boost::intrusive_ptr<ExpressionContext>& expCtx) : Expression(expCtx) {} + void _doAddDependencies(DepsTracker* deps) const override; + ExpressionVector vpOperand; }; @@ -418,7 +432,6 @@ public: class ExpressionConstant final : public Expression { public: boost::intrusive_ptr<Expression> optimize() final; - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; Value serialize(bool explain) const final; @@ -461,6 +474,9 @@ public: return _value; } +protected: + void _doAddDependencies(DepsTracker* deps) const override; + private: ExpressionConstant(const boost::intrusive_ptr<ExpressionContext>& expCtx, const Value& value); @@ -504,13 +520,6 @@ public: return evaluateDate(date, timeZone); } - void addDependencies(DepsTracker* deps) const final { - _date->addDependencies(deps); - if (_timeZone) { - _timeZone->addDependencies(deps); - } - } - /** * Always serializes to the full {date: <date arg>, timezone: <timezone arg>} format, leaving * off the timezone if not specified. @@ -594,6 +603,13 @@ protected: const boost::intrusive_ptr<ExpressionContext>& expCtx) : Expression(expCtx), _opName(opName) {} + void _doAddDependencies(DepsTracker* deps) const final { + _date->addDependencies(deps); + if (_timeZone) { + _timeZone->addDependencies(deps); + } + } + /** * Subclasses should implement this to do their actual date-related logic. Uses 'timezone' to * evaluate the expression against 'data'. If the user did not specify a time zone, 'timezone' @@ -729,7 +745,6 @@ public: class ExpressionCoerceToBool final : public Expression { public: boost::intrusive_ptr<Expression> optimize() final; - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; Value serialize(bool explain) const final; @@ -737,6 +752,9 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, const boost::intrusive_ptr<Expression>& pExpression); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionCoerceToBool(const boost::intrusive_ptr<ExpressionContext>& expCtx, const boost::intrusive_ptr<Expression>& pExpression); @@ -833,13 +851,15 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document&) const final; - void addDependencies(DepsTracker*) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONElement expr, const VariablesParseState& vps); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionDateFromString(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::intrusive_ptr<Expression> dateString, @@ -854,13 +874,15 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONElement expr, const VariablesParseState& vps); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionDateFromParts(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::intrusive_ptr<Expression> year, @@ -905,13 +927,15 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONElement expr, const VariablesParseState& vps); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: /** * The iso8601 argument controls whether to output ISO8601 elements or natural calendar. @@ -933,13 +957,15 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONElement expr, const VariablesParseState& vps); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionDateToString(const boost::intrusive_ptr<ExpressionContext>& expCtx, const std::string& format, // The format string. @@ -1007,7 +1033,6 @@ public: class ExpressionFieldPath final : public Expression { public: boost::intrusive_ptr<Expression> optimize() final; - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; Value serialize(bool explain) const final; @@ -1040,6 +1065,9 @@ public: ComputedPaths getComputedPaths(const std::string& exprFieldPath, Variables::Id renamingVar) const final; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionFieldPath(const boost::intrusive_ptr<ExpressionContext>& expCtx, const std::string& fieldPath, @@ -1073,13 +1101,15 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONElement expr, const VariablesParseState& vps); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, std::string varName, @@ -1177,7 +1207,6 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -1195,6 +1224,9 @@ public: typedef std::map<Variables::Id, NameAndExpression> VariableMap; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionLet(const boost::intrusive_ptr<ExpressionContext>& expCtx, const VariableMap& vars, @@ -1236,7 +1268,6 @@ public: boost::intrusive_ptr<Expression> optimize() final; Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -1246,6 +1277,9 @@ public: ComputedPaths getComputedPaths(const std::string& exprFieldPath, Variables::Id renamingVar) const final; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionMap( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -1264,13 +1298,15 @@ class ExpressionMeta final : public Expression { public: Value serialize(bool explain) const final; Value evaluate(const Document& root) const final; - void addDependencies(DepsTracker* deps) const final; static boost::intrusive_ptr<Expression> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONElement expr, const VariablesParseState& vps); +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: enum MetaType { TEXT_SCORE, @@ -1364,7 +1400,6 @@ public: class ExpressionObject final : public Expression { public: boost::intrusive_ptr<Expression> optimize() final; - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; Value serialize(bool explain) const final; @@ -1391,6 +1426,9 @@ public: ComputedPaths getComputedPaths(const std::string& exprFieldPath, Variables::Id renamingVar) const final; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: ExpressionObject( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -1449,7 +1487,6 @@ public: explicit ExpressionReduce(const boost::intrusive_ptr<ExpressionContext>& expCtx) : Expression(expCtx) {} - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; boost::intrusive_ptr<Expression> optimize() final; static boost::intrusive_ptr<Expression> parse( @@ -1458,6 +1495,9 @@ public: const VariablesParseState& vpsIn); Value serialize(bool explain) const final; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: boost::intrusive_ptr<Expression> _input; boost::intrusive_ptr<Expression> _initial; @@ -1676,7 +1716,6 @@ public: explicit ExpressionSwitch(const boost::intrusive_ptr<ExpressionContext>& expCtx) : Expression(expCtx) {} - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; boost::intrusive_ptr<Expression> optimize() final; static boost::intrusive_ptr<Expression> parse( @@ -1685,6 +1724,9 @@ public: const VariablesParseState& vpsIn); Value serialize(bool explain) const final; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: using ExpressionPair = std::pair<boost::intrusive_ptr<Expression>, boost::intrusive_ptr<Expression>>; @@ -1795,7 +1837,6 @@ public: explicit ExpressionZip(const boost::intrusive_ptr<ExpressionContext>& expCtx) : Expression(expCtx) {} - void addDependencies(DepsTracker* deps) const final; Value evaluate(const Document& root) const final; boost::intrusive_ptr<Expression> optimize() final; static boost::intrusive_ptr<Expression> parse( @@ -1804,6 +1845,9 @@ public: const VariablesParseState& vpsIn); Value serialize(bool explain) const final; +protected: + void _doAddDependencies(DepsTracker* deps) const final; + private: bool _useLongestLength = false; ExpressionVector _inputs; diff --git a/src/mongo/db/pipeline/expression_test.cpp b/src/mongo/db/pipeline/expression_test.cpp index bd3abd1ffc3..048dcbe769d 100644 --- a/src/mongo/db/pipeline/expression_test.cpp +++ b/src/mongo/db/pipeline/expression_test.cpp @@ -2663,6 +2663,68 @@ TEST(ExpressionObjectDependencies, FieldPathsShouldBeAddedToDependencies) { ASSERT_EQ(deps.fields.count("c.d"), 1UL); }; +TEST(ExpressionObjectDependencies, VariablesShouldBeAddedToDependencies) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + auto varID = expCtx->variablesParseState.defineVariable("var1"); + auto fieldPath = ExpressionFieldPath::parse(expCtx, "$$var1", expCtx->variablesParseState); + DepsTracker deps; + fieldPath->addDependencies(&deps); + ASSERT_EQ(deps.vars.size(), 1UL); + ASSERT_EQ(deps.vars.count(varID), 1UL); +} + +TEST(ExpressionObjectDependencies, LocalLetVariablesShouldBeFilteredOutOfDependencies) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + expCtx->variablesParseState.defineVariable("var1"); + auto letSpec = BSON("$let" << BSON("vars" << BSON("var2" + << "abc") + << "in" + << BSON("$multiply" << BSON_ARRAY("$$var1" + << "$$var2")))); + auto expressionLet = + ExpressionLet::parse(expCtx, letSpec.firstElement(), expCtx->variablesParseState); + DepsTracker deps; + expressionLet->addDependencies(&deps); + ASSERT_EQ(deps.vars.size(), 1UL); + ASSERT_EQ(expCtx->variablesParseState.getVariable("var1"), *deps.vars.begin()); +} + +TEST(ExpressionObjectDependencies, LocalMapVariablesShouldBeFilteredOutOfDependencies) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + expCtx->variablesParseState.defineVariable("var1"); + auto mapSpec = BSON("$map" << BSON("input" + << "$field1" + << "as" + << "var2" + << "in" + << BSON("$multiply" << BSON_ARRAY("$$var1" + << "$$var2")))); + + auto expressionMap = + ExpressionMap::parse(expCtx, mapSpec.firstElement(), expCtx->variablesParseState); + DepsTracker deps; + expressionMap->addDependencies(&deps); + ASSERT_EQ(deps.vars.size(), 1UL); + ASSERT_EQ(expCtx->variablesParseState.getVariable("var1"), *deps.vars.begin()); +} + +TEST(ExpressionObjectDependencies, LocalFilterVariablesShouldBeFilteredOutOfDependencies) { + intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + expCtx->variablesParseState.defineVariable("var1"); + auto filterSpec = BSON("$filter" << BSON("input" << BSON_ARRAY(1 << 2 << 3) << "as" + << "var2" + << "cond" + << BSON("$gt" << BSON_ARRAY("$$var1" + << "$$var2")))); + + auto expressionFilter = + ExpressionFilter::parse(expCtx, filterSpec.firstElement(), expCtx->variablesParseState); + DepsTracker deps; + expressionFilter->addDependencies(&deps); + ASSERT_EQ(deps.vars.size(), 1UL); + ASSERT_EQ(expCtx->variablesParseState.getVariable("var1"), *deps.vars.begin()); +} + // // Optimizations. // diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index d4a00168e3d..34e04912e49 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -217,7 +217,7 @@ void Pipeline::optimizePipeline() { // We could be swapping around stages during this process, so disconnect the pipeline to prevent // us from entering a state with dangling pointers. unstitch(); - while (itr != _sources.end() && std::next(itr) != _sources.end()) { + while (itr != _sources.end()) { invariant((*itr).get()); itr = (*itr).get()->optimizeAt(itr, &_sources); } @@ -508,19 +508,35 @@ void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) { _sources.push_front(source); } +void Pipeline::addFinalSource(intrusive_ptr<DocumentSource> source) { + if (!_sources.empty()) { + source->setSource(_sources.back().get()); + } + _sources.push_back(source); +} + DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAvailable) const { DepsTracker deps(metadataAvailable); + const bool scopeHasVariables = pCtx->variablesParseState.hasDefinedVariables(); + bool skipFieldsAndMetadataDeps = false; bool knowAllFields = false; bool knowAllMeta = false; for (auto&& source : _sources) { DepsTracker localDeps(deps.getMetadataAvailable()); DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps); - if (status == DocumentSource::NOT_SUPPORTED) { + deps.vars.insert(localDeps.vars.begin(), localDeps.vars.end()); + + if ((skipFieldsAndMetadataDeps |= (status == DocumentSource::NOT_SUPPORTED))) { // Assume this stage needs everything. We may still know something about our - // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or - // EXHAUSTIVE_META. - break; + // dependencies if an earlier stage returned EXHAUSTIVE_FIELDS or EXHAUSTIVE_META. If + // this scope has variables, we need to keep enumerating the remaining stages but will + // skip adding any further field or metadata dependencies. + if (scopeHasVariables) { + continue; + } else { + break; + } } if (!knowAllFields) { @@ -540,7 +556,9 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; } - if (knowAllMeta && knowAllFields) { + // If there are variables defined at this pipeline's scope, there may be dependencies upon + // them in subsequent stages. Keep enumerating. + if (knowAllMeta && knowAllFields && !scopeHasVariables) { break; } } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 5da1a018289..c5e94837019 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -245,9 +245,11 @@ public: */ std::vector<Value> serialize() const; - /// The initial source is special since it varies between mongos and mongod. + // The initial source is special since it varies between mongos and mongod. void addInitialSource(boost::intrusive_ptr<DocumentSource> source); + void addFinalSource(boost::intrusive_ptr<DocumentSource> source); + /** * Returns the next result from the pipeline, or boost::none if there are no more results. */ diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 33043acde9e..8f15e12e8bb 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -191,7 +191,8 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) final { // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace // than the DocumentSource this MongodImplementation is injected into, but both // ExpressionContext instances should still have the same OperationContext. @@ -202,7 +203,26 @@ public: return pipeline.getStatus(); } - pipeline.getValue()->optimizePipeline(); + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + Status cursorStatus = Status::OK(); + + if (opts.attachCursorSource) { + cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + } else if (opts.forceInjectMongod) { + PipelineD::injectMongodInterface(pipeline.getValue().get()); + } + + return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; + } + + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + invariant(_ctx->opCtx == expCtx->opCtx); + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); boost::optional<AutoGetCollectionForReadCommand> autoColl; if (expCtx->uuid) { @@ -226,10 +246,9 @@ public: auto css = CollectionShardingState::get(_ctx->opCtx, expCtx->ns); uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata())); - PipelineD::prepareCursorSource( - autoColl->getCollection(), expCtx->ns, nullptr, pipeline.getValue().get()); + PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - return pipeline; + return Status::OK(); } std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, @@ -460,6 +479,15 @@ BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { } } // namespace +void PipelineD::injectMongodInterface(Pipeline* pipeline) { + for (auto&& source : pipeline->_sources) { + if (auto needsMongod = dynamic_cast<DocumentSourceNeedsMongod*>(source.get())) { + needsMongod->injectMongodInterface( + std::make_shared<MongodImplementation>(pipeline->getContext())); + } + } +} + void PipelineD::prepareCursorSource(Collection* collection, const NamespaceString& nss, const AggregationRequest* aggRequest, @@ -470,13 +498,7 @@ void PipelineD::prepareCursorSource(Collection* collection, Pipeline::SourceContainer& sources = pipeline->_sources; // Inject a MongodImplementation to sources that need them. - for (auto&& source : sources) { - DocumentSourceNeedsMongod* needsMongod = - dynamic_cast<DocumentSourceNeedsMongod*>(source.get()); - if (needsMongod) { - needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(expCtx)); - } - } + injectMongodInterface(pipeline); if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { return; diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 3c1acaac998..0da6ebe2f20 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -81,6 +81,11 @@ public: const AggregationRequest* aggRequest, Pipeline* pipeline); + /** + * Injects a MongodInterface into stages which require access to mongod-specific functionality. + */ + static void injectMongodInterface(Pipeline* pipeline); + static std::string getPlanSummaryStr(const Pipeline* pipeline); static void getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut); diff --git a/src/mongo/db/pipeline/sequential_document_cache.cpp b/src/mongo/db/pipeline/sequential_document_cache.cpp new file mode 100644 index 00000000000..93d95c0e072 --- /dev/null +++ b/src/mongo/db/pipeline/sequential_document_cache.cpp @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/sequential_document_cache.h" + +#include "mongo/base/error_codes.h" +#include "mongo/base/status.h" + +namespace mongo { + +void SequentialDocumentCache::add(Document doc) { + invariant(_status == CacheStatus::kBuilding); + + if (checkCacheSize(doc) != CacheStatus::kAbandoned) { + _sizeBytes += doc.getApproximateSize(); + _cache.push_back(std::move(doc)); + } +} + +void SequentialDocumentCache::freeze() { + invariant(_status == CacheStatus::kBuilding); + + _status = CacheStatus::kServing; + _cache.shrink_to_fit(); + + _cacheIter = _cache.begin(); +} + +void SequentialDocumentCache::abandon() { + _status = CacheStatus::kAbandoned; + + _cache.clear(); + _cache.shrink_to_fit(); + + _cacheIter = _cache.begin(); +} + +boost::optional<Document> SequentialDocumentCache::getNext() { + invariant(_status == CacheStatus::kServing); + + if (_cacheIter == _cache.end()) { + return boost::none; + } + + return *_cacheIter++; +} + +void SequentialDocumentCache::restartIteration() { + invariant(_status == CacheStatus::kServing); + _cacheIter = _cache.begin(); +} + +SequentialDocumentCache::CacheStatus SequentialDocumentCache::checkCacheSize(const Document& doc) { + if (_sizeBytes + doc.getApproximateSize() > _maxSizeBytes) { + abandon(); + } + + return _status; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/sequential_document_cache.h b/src/mongo/db/pipeline/sequential_document_cache.h new file mode 100644 index 00000000000..625369d934c --- /dev/null +++ b/src/mongo/db/pipeline/sequential_document_cache.h @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/optional/optional.hpp> +#include <stddef.h> +#include <vector> + +#include "mongo/db/pipeline/document.h" + +#include "mongo/base/status.h" + +namespace mongo { + +/** + * Implements a sequential cache of Documents, up to an optional maximum size. Can be in one of + * three states: building, serving, or abandoned. See SequentialDocumentCache::CacheStatus. + */ +class SequentialDocumentCache { + MONGO_DISALLOW_COPYING(SequentialDocumentCache); + +public: + explicit SequentialDocumentCache(size_t maxCacheSizeBytes) : _maxSizeBytes(maxCacheSizeBytes) {} + + SequentialDocumentCache(SequentialDocumentCache&& moveFrom) + : _status(moveFrom._status), + _maxSizeBytes(moveFrom._maxSizeBytes), + _sizeBytes(moveFrom._sizeBytes), + _cacheIter(std::move(moveFrom._cacheIter)), + _cache(std::move(moveFrom._cache)) {} + + SequentialDocumentCache& operator=(SequentialDocumentCache&& moveFrom) { + _cacheIter = std::move(moveFrom._cacheIter); + _maxSizeBytes = moveFrom._maxSizeBytes; + _cache = std::move(moveFrom._cache); + _sizeBytes = moveFrom._sizeBytes; + _status = moveFrom._status; + + return *this; + } + + /** + * Defines the states that the cache may be in at any given time. + */ + enum class CacheStatus { + // The cache is being filled. More documents may be added. A newly instantiated cache is in + // this state by default. + kBuilding, + + // The caller has invoked freeze() to indicate that no more Documents need to be added. The + // cache is read-only at this point. + kServing, + + // The maximum permitted cache size has been exceeded, or the caller has explicitly + // abandoned the cache. Cannot add more documents or call getNext. + kAbandoned, + }; + + /** + * Adds a document to the back of the cache. May only be called while the cache is in + * 'kBuilding' mode. + */ + void add(Document doc); + + /** + * Moves the cache into 'kServing' (read-only) mode, and attempts to release any excess + * allocated memory. May only be called while the cache is in 'kBuilding' mode. + */ + void freeze(); + + /** + * Abandons the cache, marking it as 'kAbandoned' and freeing any memory allocated while + * building. + */ + void abandon(); + + /** + * Returns the next Document in sequence from the cache, or boost::none if the end of the cache + * has been reached. May only be called while in 'kServing' mode. + */ + boost::optional<Document> getNext(); + + /** + * Resets the cache iterator to the beginning of the cache. May only be called while the cache + * is in 'kServing' mode. + */ + void restartIteration(); + + CacheStatus status() const { + return _status; + } + + size_t sizeBytes() const { + return _sizeBytes; + } + + size_t maxSizeBytes() const { + return _maxSizeBytes; + } + + size_t count() const { + return _cache.size(); + } + + bool isBuilding() const { + return _status == CacheStatus::kBuilding; + } + + bool isServing() const { + return _status == CacheStatus::kServing; + } + + bool isAbandoned() const { + return _status == CacheStatus::kAbandoned; + } + +private: + CacheStatus checkCacheSize(const Document& doc); + + CacheStatus _status = CacheStatus::kBuilding; + size_t _maxSizeBytes = 0; + size_t _sizeBytes = 0; + + std::vector<Document>::iterator _cacheIter; + std::vector<Document> _cache; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/sequential_document_cache_test.cpp b/src/mongo/db/pipeline/sequential_document_cache_test.cpp new file mode 100644 index 00000000000..5372a80ecfe --- /dev/null +++ b/src/mongo/db/pipeline/sequential_document_cache_test.cpp @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/sequential_document_cache.h" + +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +const size_t kCacheSizeBytes = 1024; + +TEST(SequentialDocumentCacheTest, CacheIsInBuildingModeUponInstantiation) { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); +} + +TEST(SequentialDocumentCacheTest, CanAddDocumentsToCacheWhileBuilding) { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.add(DOC("_id" << 0)); + cache.add(DOC("_id" << 1)); + + ASSERT_EQ(cache.count(), 2ul); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotIterateCacheWhileBuilding, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.getNext(); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotRestartCacheIterationWhileBuilding, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.restartIteration(); +} + +TEST(SequentialDocumentCacheTest, CanIterateCacheAfterFreezing) { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.add(DOC("_id" << 0)); + cache.add(DOC("_id" << 1)); + + ASSERT_EQ(cache.count(), 2ul); + + cache.freeze(); + + ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 0)); + ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 1)); + ASSERT_FALSE(cache.getNext().is_initialized()); +} + +TEST(SequentialDocumentCacheTest, CanRestartCacheIterationAfterFreezing) { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.add(DOC("_id" << 0)); + cache.add(DOC("_id" << 1)); + + ASSERT_EQ(cache.count(), 2ul); + + cache.freeze(); + + ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 0)); + ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 1)); + ASSERT_FALSE(cache.getNext().is_initialized()); + + cache.restartIteration(); + + ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 0)); + ASSERT_DOCUMENT_EQ(*cache.getNext(), DOC("_id" << 1)); + ASSERT_FALSE(cache.getNext().is_initialized()); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotAddDocumentsToCacheAfterFreezing, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + cache.freeze(); + + ASSERT(cache.isServing()); + + cache.add(DOC("_id" << 0)); +} + +TEST(SequentialDocumentCacheTest, ShouldAbandonCacheIfMaxSizeBytesExceeded) { + SequentialDocumentCache cache(0); + ASSERT(cache.isBuilding()); + + cache.add(DOC("_id" << 0)); + + ASSERT(cache.isAbandoned()); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotAddDocumentsToAbandonedCache, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + cache.abandon(); + + cache.add(DOC("_id" << 0)); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotFreezeCacheWhenAbandoned, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + cache.abandon(); + + cache.freeze(); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotRestartCacheIterationWhenAbandoned, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.abandon(); + + cache.restartIteration(); +} + +DEATH_TEST(SequentialDocumentCacheTest, CannotCallGetNextWhenAbandoned, "invariant") { + SequentialDocumentCache cache(kCacheSizeBytes); + ASSERT(cache.isBuilding()); + + cache.add(DOC("_id" << 0)); + cache.add(DOC("_id" << 1)); + + cache.abandon(); + + cache.getNext(); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/stub_mongod_interface.h b/src/mongo/db/pipeline/stub_mongod_interface.h index 26127f3d241..f49211a13ac 100644 --- a/src/mongo/db/pipeline/stub_mongod_interface.h +++ b/src/mongo/db/pipeline/stub_mongod_interface.h @@ -93,7 +93,13 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) override { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) override { + MONGO_UNREACHABLE; + } + + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) override { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/variables.cpp b/src/mongo/db/pipeline/variables.cpp index ac713a37f5a..fc7983dbf5e 100644 --- a/src/mongo/db/pipeline/variables.cpp +++ b/src/mongo/db/pipeline/variables.cpp @@ -154,7 +154,9 @@ Variables::Id VariablesParseState::defineVariable(StringData name) { Variables::kBuiltinVarNameToId.find(name) == Variables::kBuiltinVarNameToId.end()); Variables::Id id = _idGenerator->generateId(); - _variables[name] = id; + invariant(id > _lastSeen); + + _variables[name] = _lastSeen = id; return id; } @@ -176,4 +178,14 @@ Variables::Id VariablesParseState::getVariable(StringData name) const { uassert(17276, str::stream() << "Use of undefined variable: " << name, name == "CURRENT"); return Variables::kRootId; } + +std::set<Variables::Id> VariablesParseState::getDefinedVariableIDs() const { + std::set<Variables::Id> ids; + + for (auto&& keyId : _variables) { + ids.insert(keyId.second); + } + + return ids; +} } diff --git a/src/mongo/db/pipeline/variables.h b/src/mongo/db/pipeline/variables.h index 4e487e811f4..0d8dd403c36 100644 --- a/src/mongo/db/pipeline/variables.h +++ b/src/mongo/db/pipeline/variables.h @@ -45,7 +45,8 @@ public: using Id = int64_t; /** - * Generates Variables::Id and keeps track of the number of Ids handed out. + * Generates Variables::Id and keeps track of the number of Ids handed out. Each successive Id + * generated by an instance of this class must be greater than all preceding Ids. */ class IdGenerator { public: @@ -141,11 +142,23 @@ public: Variables::Id defineVariable(StringData name); /** + * Returns true if there are any variables defined in this scope. + */ + bool hasDefinedVariables() const { + return !_variables.empty(); + } + + /** * Returns the current Id for a variable. uasserts if the variable isn't defined. */ Variables::Id getVariable(StringData name) const; /** + * Returns the set of variable IDs defined at this scope. + */ + std::set<Variables::Id> getDefinedVariableIDs() const; + + /** * Return a copy of this VariablesParseState. Will replace the copy's '_idGenerator' pointer * with 'idGenerator'. */ @@ -160,6 +173,7 @@ private: Variables::IdGenerator* _idGenerator; StringMap<Variables::Id> _variables; + Variables::Id _lastSeen = -1; }; } // namespace mongo |