diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-02-06 12:23:23 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-02-14 16:54:47 -0500 |
commit | f1da8170c1c568762b29e44910afbcc6f01a644f (patch) | |
tree | 6c45403feafcc9748500bb809cb0d7c5a6ce19de | |
parent | b494c59557d464d1e13f7c36e7148b48e4d87208 (diff) | |
download | mongo-f1da8170c1c568762b29e44910afbcc6f01a644f.tar.gz |
SERVER-38360 Prevent $out writing to ns read from in the same pipeline
This restriction only applies to $out with modes "insertDocuments" and
"replaceDocuments".
-rw-r--r-- | jstests/aggregation/sources/out/out_to_referenced_collection.js | 132 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 31 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 155 |
12 files changed, 380 insertions, 90 deletions
diff --git a/jstests/aggregation/sources/out/out_to_referenced_collection.js b/jstests/aggregation/sources/out/out_to_referenced_collection.js new file mode 100644 index 00000000000..a85f90a9816 --- /dev/null +++ b/jstests/aggregation/sources/out/out_to_referenced_collection.js @@ -0,0 +1,132 @@ +// Tests that the server behaves as expected when an $out stage is targeting a collection which is +// involved in the aggregate in some other way, e.g. as the source namespace or via a $lookup. We +// disallow this combination in an effort to prevent the "halloween problem" of a never-ending +// query. If the $out is using mode "replaceCollection" then this is legal because we use a +// temporary collection. If the out is using any other mode which would be "in place", we expect the +// server to error in an effort to prevent server-side infinite loops. +// This test issues queries over views, so cannot be run in passthroughs which implicitly shard +// collections. +// @tags: [assumes_unsharded_collection] +(function() { + 'use strict'; + + load('jstests/aggregation/extras/out_helpers.js'); // For 'withEachOutMode'. + load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers'. + + const testDB = db.getSiblingDB("out_to_referenced_coll"); + const coll = testDB.test; + + withEachOutMode(mode => { + coll.drop(); + if (FixtureHelpers.isSharded(coll) && mode === "replaceCollection") { + return; // Not a supported combination. + } + + // Seed the collection to ensure each pipeline will actually do something. + assert.commandWorked(coll.insert({_id: 0})); + + // Each of the following assertions will somehow use $out to write to a namespace that is + // being read from elsewhere in the pipeline. This is legal with mode "replaceCollection". + const assertFailsWithCode = ((fn) => { + const error = assert.throws(fn); + assert.contains(error.code, [50992, 51079]); + }); + const asserter = (mode === "replaceCollection") ? assert.doesNotThrow : assertFailsWithCode; + + // Test $out to the aggregate command's source collection. + asserter(() => coll.aggregate([{$out: {to: coll.getName(), mode: mode}}])); + // Test $out to the same namespace as a $lookup which is the same as the aggregate command's + // source collection. + asserter(() => coll.aggregate([ + {$lookup: {from: coll.getName(), as: "x", localField: "f_id", foreignField: "_id"}}, + {$out: {to: coll.getName(), mode: mode}} + ])); + // Test $out to the same namespace as a $lookup which is *not* the same as the aggregate + // command's source collection. + asserter(() => coll.aggregate([ + {$lookup: {from: "bar", as: "x", localField: "f_id", foreignField: "_id"}}, + {$out: {to: "bar", mode: mode}} + ])); + // Test $out to the same namespace as a $graphLookup. + asserter(() => coll.aggregate([ + { + $graphLookup: { + from: "bar", + startWith: "$_id", + connectFromField: "_id", + connectToField: "parent_id", + as: "graph", + } + }, + {$out: {to: "bar", mode: mode}} + ])); + // Test $out to the same namespace as a $lookup which is nested within another $lookup. + asserter(() => coll.aggregate([ + { + $lookup: { + from: "bar", + as: "x", + let : {}, + pipeline: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}] + } + }, + {$out: {to: "TARGET", mode: mode}} + ])); + // Test $out to the same namespace as a $lookup which is nested within a $facet. + asserter(() => coll.aggregate([ + { + $facet: { + y: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}], + } + }, + {$out: {to: "TARGET", mode: mode}} + ])); + asserter(() => coll.aggregate([ + { + $facet: { + x: [{$lookup: {from: "other", as: "y", pipeline: []}}], + y: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}], + } + }, + {$out: {to: "TARGET", mode: mode}} + ])); + + // Test that we use the resolved namespace of a view to detect this sort of halloween + // problem. + assert.commandWorked( + testDB.runCommand({create: "view_on_TARGET", viewOn: "TARGET", pipeline: []})); + asserter(() => testDB.view_on_TARGET.aggregate([{$out: {to: "TARGET", mode: mode}}])); + asserter(() => coll.aggregate([ + { + $facet: { + x: [{$lookup: {from: "other", as: "y", pipeline: []}}], + y: [{ + $lookup: { + from: "yet_another", + as: "y", + pipeline: [{$lookup: {from: "view_on_TARGET", as: "z", pipeline: []}}] + } + }], + } + }, + {$out: {to: "TARGET", mode: mode}} + ])); + + function generateNestedPipeline(foreignCollName, numLevels) { + let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}]; + + for (let level = 1; level < numLevels; level++) { + pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}]; + } + + return pipeline; + } + + const nestedPipeline = + generateNestedPipeline("lookup", 20).concat([{$out: {to: "lookup", mode: mode}}]); + asserter(() => coll.aggregate(nestedPipeline)); + + testDB.dropDatabase(); // Clear any of the collections which would be created by the + // successful "replaceCollection" mode of this test. + }); +}()); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 8ecefdd485f..3d0ba7fadda 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -295,11 +295,11 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames Status collatorCompatibleWithPipeline(OperationContext* opCtx, Database* db, const CollatorInterface* collator, - const Pipeline* pipeline) { - if (!db || !pipeline) { + const LiteParsedPipeline& liteParsedPipeline) { + if (!db) { return Status::OK(); } - for (auto&& potentialViewNs : pipeline->getInvolvedCollections()) { + for (auto&& potentialViewNs : liteParsedPipeline.getInvolvedNamespaces()) { if (db->getCollection(opCtx, potentialViewNs)) { continue; } @@ -538,7 +538,7 @@ Status runAggregate(OperationContext* opCtx, if (!pipelineInvolvedNamespaces.empty()) { invariant(ctx); auto pipelineCollationStatus = collatorCompatibleWithPipeline( - opCtx, ctx->getDb(), expCtx->getCollator(), pipeline.get()); + opCtx, ctx->getDb(), expCtx->getCollator(), liteParsedPipeline); if (!pipelineCollationStatus.isOK()) { return pipelineCollationStatus; } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 8ccec0231c1..1e6efa6dc54 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -298,9 +298,12 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const; /** - * If DocumentSource uses additional collections, it adds the namespaces to the input vector. + * If this stage uses additional namespaces, adds them to 'collectionNames'. These namespaces + * should all be names of collections, not views. */ - virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {} + virtual void addInvolvedCollections( + stdx::unordered_set<NamespaceString>* collectionNames) const {} + virtual void detachFromOperationContext() {} diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index ab927d4f228..c3085279e8f 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -208,10 +208,11 @@ Value DocumentSourceFacet::serialize(boost::optional<ExplainOptions::Verbosity> return Value(Document{{"$facet", serialized.freezeToValue()}}); } -void DocumentSourceFacet::addInvolvedCollections(vector<NamespaceString>* collections) const { +void DocumentSourceFacet::addInvolvedCollections( + stdx::unordered_set<NamespaceString>* collectionNames) const { for (auto&& facet : _facets) { for (auto&& source : facet.pipeline->getSources()) { - source->addInvolvedCollections(collections); + source->addInvolvedCollections(collectionNames); } } } diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 6b8f2bcac46..8c82ecf7cb1 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -135,7 +135,7 @@ public: } // The following are overridden just to forward calls to sub-pipelines. - void addInvolvedCollections(std::vector<NamespaceString>* collections) const final; + void addInvolvedCollections(stdx::unordered_set<NamespaceString>* involvedNssSet) const final; void detachFromOperationContext() final; void reattachToOperationContext(OperationContext* opCtx) final; StageConstraints constraints(Pipeline::SplitState pipeState) const final; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index ee745dedda6..eea8b370cdf 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -476,7 +476,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage // we'll eventually construct from the input document. _fromPipeline.reserve(_fromPipeline.size() + 1); - _fromPipeline.push_back(BSONObj()); + _fromPipeline.push_back(BSON("$match" << BSONObj())); } intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create( @@ -602,4 +602,13 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( return std::move(newSource); } + +void DocumentSourceGraphLookUp::addInvolvedCollections( + stdx::unordered_set<NamespaceString>* collectionNames) const { + collectionNames->insert(_fromExpCtx->ns); + auto introspectionPipeline = uassertStatusOK(Pipeline::parse(_fromPipeline, _fromExpCtx)); + for (auto&& stage : introspectionPipeline->getSources()) { + stage->addInvolvedCollections(collectionNames); + } +} } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index fb6703c97dd..4077c058770 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -83,9 +83,7 @@ public: return DepsTracker::State::SEE_NEXT; }; - void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { - collections->push_back(_from); - } + void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final; void detachFromOperationContext() final; diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index a900b777073..8a3a97097d1 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -47,39 +47,20 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; -namespace { -std::string pipelineToString(const vector<BSONObj>& pipeline) { - StringBuilder sb; - sb << "["; - - auto first = true; - for (auto& stageSpec : pipeline) { - if (!first) { - sb << ", "; - } else { - first = false; - } - sb << stageSpec; - } - sb << "]"; - return sb.str(); -} -} // namespace - constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth; DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), _fromNs(std::move(fromNs)), _as(std::move(as)), - _variables(pExpCtx->variables), - _variablesParseState(pExpCtx->variablesParseState.copyWith(_variables.useIdGenerator())) { - const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs); + _variables(expCtx->variables), + _variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) { + const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs); _resolvedNs = resolvedNamespace.ns; _resolvedPipeline = resolvedNamespace.pipeline; - _fromExpCtx = pExpCtx->copyWith(_resolvedNs); + _fromExpCtx = expCtx->copyWith(_resolvedNs); _fromExpCtx->subPipelineDepth += 1; uassert(ErrorCodes::MaxSubPipelineDepthExceeded, @@ -92,22 +73,23 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, std::string localField, std::string foreignField, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceLookUp(fromNs, as, pExpCtx) { + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceLookUp(fromNs, as, expCtx) { _localField = std::move(localField); _foreignField = std::move(foreignField); // We append an additional BSONObj to '_resolvedPipeline' as a placeholder for the $match stage // we'll eventually construct from the input document. _resolvedPipeline.reserve(_resolvedPipeline.size() + 1); - _resolvedPipeline.push_back(BSONObj()); + _resolvedPipeline.push_back(BSON("$match" << BSONObj())); + initializeResolvedIntrospectionPipeline(); } DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, std::vector<BSONObj> pipeline, BSONObj letVariables, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceLookUp(fromNs, as, pExpCtx) { + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceLookUp(fromNs, as, expCtx) { // '_resolvedPipeline' will first be initialized by the constructor delegated to within this // constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs' // represents a view. We append the user 'pipeline' to the end of '_resolvedPipeline' to ensure @@ -124,11 +106,11 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, _letVariables.emplace_back( varName.toString(), - Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState), + Expression::parseOperand(expCtx, varElem, expCtx->variablesParseState), _variablesParseState.defineVariable(varName)); } - initializeIntrospectionPipeline(); + initializeResolvedIntrospectionPipeline(); } std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse( @@ -190,7 +172,7 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { // transaction requirement from the children in its pipeline. if (wasConstructedWithPipelineSyntax()) { const auto resolvedRequirements = StageConstraints::resolveDiskUseAndTransactionRequirement( - _parsedIntrospectionPipeline->getSources()); + _resolvedIntrospectionPipeline->getSources()); diskRequirement = resolvedRequirements.first; txnRequirement = resolvedRequirements.second; } @@ -465,14 +447,6 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt( return itr; } -std::string DocumentSourceLookUp::getUserPipelineDefinition() { - if (wasConstructedWithPipelineSyntax()) { - return pipelineToString(_userPipeline); - } - - return _resolvedPipeline.back().toString(); -} - bool DocumentSourceLookUp::usedDisk() { if (_pipeline) _usedDisk = _usedDisk || _pipeline->usedDisk(); @@ -654,11 +628,12 @@ void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variabl } } -void DocumentSourceLookUp::initializeIntrospectionPipeline() { +void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() { copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - _parsedIntrospectionPipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + _resolvedIntrospectionPipeline = + uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); - auto& sources = _parsedIntrospectionPipeline->getSources(); + auto& sources = _resolvedIntrospectionPipeline->getSources(); // Ensure that the pipeline does not contain a $changeStream stage. This check will be // performed recursively on all sub-pipelines. @@ -685,7 +660,7 @@ void DocumentSourceLookUp::serializeToArray( auto pipeline = _userPipeline; if (_additionalFilter) { - pipeline.push_back(BSON("$match" << *_additionalFilter)); + pipeline.emplace_back(BSON("$match" << *_additionalFilter)); } doc = Document{{getSourceName(), @@ -741,7 +716,7 @@ void DocumentSourceLookUp::serializeToArray( DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) const { if (wasConstructedWithPipelineSyntax()) { // We will use the introspection pipeline which we prebuilt during construction. - invariant(_parsedIntrospectionPipeline); + invariant(_resolvedIntrospectionPipeline); // We are not attempting to enforce that any referenced metadata are in fact available, // this is done elsewhere. We only need to know what variable dependencies exist in the @@ -752,7 +727,7 @@ DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) cons // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables // declared by this $lookup and variables declared externally. - for (auto&& source : _parsedIntrospectionPipeline->getSources()) { + for (auto&& source : _resolvedIntrospectionPipeline->getSources()) { source->getDependencies(&subDeps); } @@ -839,7 +814,9 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( } uassert(ErrorCodes::FailedToParse, - str::stream() << "$lookup argument '" << argument << "' must be a string, is type " + str::stream() << "$lookup argument '" << argName << "' must be a string, found " + << argument + << ": " << argument.type(), argument.type() == BSONType::String); @@ -887,4 +864,13 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( pExpCtx); } } + +void DocumentSourceLookUp::addInvolvedCollections( + stdx::unordered_set<NamespaceString>* collectionNames) const { + collectionNames->insert(_resolvedNs); + for (auto&& stage : _resolvedIntrospectionPipeline->getSources()) { + stage->addInvolvedCollections(collectionNames); + } } + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index a657c022b44..dc233632496 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -126,14 +126,7 @@ public: return MergingLogic{nullptr, this, boost::none}; } - void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { - collections->push_back(_fromNs); - if (_parsedIntrospectionPipeline) { - for (auto&& stage : _parsedIntrospectionPipeline->getSources()) { - stage->addInvolvedCollections(collections); - } - } - } + void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final; void detachFromOperationContext() final; @@ -142,13 +135,13 @@ public: bool usedDisk() final; static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); static boost::intrusive_ptr<DocumentSource> createFromBsonWithCacheSize( BSONElement elem, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const boost::intrusive_ptr<ExpressionContext>& expCtx, size_t maxCacheSizeBytes) { - auto dsLookup = createFromBson(elem, pExpCtx); + auto dsLookup = createFromBson(elem, expCtx); static_cast<DocumentSourceLookUp*>(dsLookup.get())->reInitializeCache(maxCacheSizeBytes); return dsLookup; } @@ -215,7 +208,7 @@ private: */ DocumentSourceLookUp(NamespaceString fromNs, std::string as, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Constructor used for a $lookup stage specified using the {from: ..., localField: ..., @@ -225,7 +218,7 @@ private: std::string as, std::string localField, std::string foreignField, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as: @@ -235,7 +228,7 @@ private: std::string as, std::vector<BSONObj> pipeline, BSONObj letVariables, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Should not be called; use serializeToArray instead. @@ -262,7 +255,7 @@ private: * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup * pipelines will be built recursively. */ - void initializeIntrospectionPipeline(); + void initializeResolvedIntrospectionPipeline(); /** * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a @@ -271,12 +264,6 @@ private: std::unique_ptr<Pipeline, PipelineDeleter> buildPipeline(const Document& inputDoc); /** - * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that - * is executed in that it will not include optimizations or resolved views. - */ - std::string getUserPipelineDefinition(); - - /** * Reinitialize the cache with a new max size. May only be called if this DSLookup was created * with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added * to it. @@ -321,7 +308,7 @@ private: std::vector<BSONObj> _userPipeline; // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective // functions. If sub-$lookup stages are present, their pipelines are constructed recursively. - std::unique_ptr<Pipeline, PipelineDeleter> _parsedIntrospectionPipeline; + std::unique_ptr<Pipeline, PipelineDeleter> _resolvedIntrospectionPipeline; std::vector<LetVariable> _letVariables; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 80c175e3f73..677daa986f8 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -44,6 +44,7 @@ #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_out_in_place.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" @@ -158,7 +159,9 @@ void Pipeline::validateTopLevelPipeline() const { uasserted(ErrorCodes::InvalidNamespace, "{aggregate: 1} is not valid for an empty pipeline."); } - } else if ("$mergeCursors"_sd != _sources.front()->getSourceName()) { + return; + } + if ("$mergeCursors"_sd != _sources.front()->getSourceName()) { // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, // {aggregate: 1} is only valid for collectionless sources, and vice-versa. const auto firstStageConstraints = _sources.front()->constraints(_splitState); @@ -189,6 +192,21 @@ void Pipeline::validateTopLevelPipeline() const { } } } + // Make sure we aren't reading and writing to the same namespace for an $out. This is + // allowed for mode "replaceCollection", but not for the in-place modes. + if (auto outStage = dynamic_cast<DocumentSourceOutInPlace*>(_sources.back().get())) { + const auto& outNss = outStage->getOutputNs(); + stdx::unordered_set<NamespaceString> collectionNames; + // In order to gather only the involved namespaces which we are reading to, not the one we + // are writing from, skip the final stage as we know it is an $out stage. + for (auto it = _sources.begin(); it != std::prev(_sources.end()); it++) { + (*it)->addInvolvedCollections(&collectionNames); + } + uassert(51079, + "Cannot use $out to write to the same namespace being read from elsewhere in the " + "pipeline unless $out's mode is \"replaceCollection\"", + collectionNames.find(outNss) == collectionNames.end()); + } } void Pipeline::validateFacetPipeline() const { @@ -412,12 +430,12 @@ bool Pipeline::requiredToRunOnMongos() const { return false; } -std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { - std::vector<NamespaceString> collections; +stdx::unordered_set<NamespaceString> Pipeline::getInvolvedCollections() const { + stdx::unordered_set<NamespaceString> collectionNames; for (auto&& source : _sources) { - source->addInvolvedCollections(&collections); + source->addInvolvedCollections(&collectionNames); } - return collections; + return collectionNames; } vector<Value> Pipeline::serialize() const { diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 7c06f4adbd0..849143dd94e 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -233,9 +233,10 @@ public: /** * Returns any other collections involved in the pipeline in addition to the collection the - * aggregation is run on. + * aggregation is run on. All namespaces returned are the names of collections, after views have + * been resolved. */ - std::vector<NamespaceString> getInvolvedCollections() const; + stdx::unordered_set<NamespaceString> getInvolvedCollections() const; /** * Serializes the pipeline into a form that can be parsed into an equivalent pipeline. diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 963ead6ec2e..0c3d87617c1 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -39,7 +39,10 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_facet.h" +#include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_internal_split_pipeline.h" +#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -3034,6 +3037,158 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) { ASSERT_EQ(nameMap["a"], "a"); } +TEST(InvolvedNamespacesTest, NoInvolvedNamespacesForMatchSortProject) { + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), + DocumentSourceMatch::create(BSON("x" << 1), expCtx), + DocumentSourceSort::create(expCtx, BSON("y" << -1)), + DocumentSourceProject::create(BSON("x" << 1 << "y" << 1), expCtx)}, + expCtx)); + auto involvedNssSet = pipeline->getInvolvedCollections(); + ASSERT(involvedNssSet.empty()); +} + +TEST(InvolvedNamespacesTest, IncludesLookupNamespace) { + boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const NamespaceString lookupNss{"test", "foo"}; + const NamespaceString resolvedNss{"test", "bar"}; + expCtx->setResolvedNamespace(lookupNss, {resolvedNss, vector<BSONObj>{}}); + auto lookupSpec = + fromjson("{$lookup: {from: 'foo', as: 'x', localField: 'foo_id', foreignField: '_id'}}"); + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), + DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)}, + expCtx)); + + auto involvedNssSet = pipeline->getInvolvedCollections(); + ASSERT_EQ(involvedNssSet.size(), 1UL); + ASSERT(involvedNssSet.find(resolvedNss) != involvedNssSet.end()); +} + +TEST(InvolvedNamespacesTest, IncludesGraphLookupNamespace) { + boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const NamespaceString lookupNss{"test", "foo"}; + const NamespaceString resolvedNss{"test", "bar"}; + expCtx->setResolvedNamespace(lookupNss, {resolvedNss, vector<BSONObj>{}}); + auto graphLookupSpec = fromjson( + "{$graphLookup: {" + " from: 'foo'," + " as: 'x'," + " connectFromField: 'x'," + " connectToField: 'y'," + " startWith: '$start'" + "}}"); + auto pipeline = unittest::assertGet(Pipeline::create( + {DocumentSourceMock::create(), + DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)}, + expCtx)); + + auto involvedNssSet = pipeline->getInvolvedCollections(); + ASSERT_EQ(involvedNssSet.size(), 1UL); + ASSERT(involvedNssSet.find(resolvedNss) != involvedNssSet.end()); +} + +TEST(InvolvedNamespacesTest, IncludesLookupSubpipelineNamespaces) { + boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const NamespaceString outerLookupNss{"test", "foo_outer"}; + const NamespaceString outerResolvedNss{"test", "bar_outer"}; + const NamespaceString innerLookupNss{"test", "foo_inner"}; + const NamespaceString innerResolvedNss{"test", "bar_inner"}; + expCtx->setResolvedNamespace(outerLookupNss, {outerResolvedNss, vector<BSONObj>{}}); + expCtx->setResolvedNamespace(innerLookupNss, {innerResolvedNss, vector<BSONObj>{}}); + auto lookupSpec = fromjson( + "{$lookup: {" + " from: 'foo_outer', " + " as: 'x', " + " pipeline: [{$lookup: {from: 'foo_inner', as: 'y', pipeline: []}}]" + "}}"); + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), + DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)}, + expCtx)); + + auto involvedNssSet = pipeline->getInvolvedCollections(); + ASSERT_EQ(involvedNssSet.size(), 2UL); + ASSERT(involvedNssSet.find(outerResolvedNss) != involvedNssSet.end()); + ASSERT(involvedNssSet.find(innerResolvedNss) != involvedNssSet.end()); +} + +TEST(InvolvedNamespacesTest, IncludesGraphLookupSubPipeline) { + boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const NamespaceString outerLookupNss{"test", "foo_outer"}; + const NamespaceString outerResolvedNss{"test", "bar_outer"}; + const NamespaceString innerLookupNss{"test", "foo_inner"}; + const NamespaceString innerResolvedNss{"test", "bar_inner"}; + expCtx->setResolvedNamespace(outerLookupNss, {outerResolvedNss, vector<BSONObj>{}}); + expCtx->setResolvedNamespace( + outerLookupNss, + {outerResolvedNss, + vector<BSONObj>{fromjson("{$lookup: {from: 'foo_inner', as: 'x', pipeline: []}}")}}); + expCtx->setResolvedNamespace(innerLookupNss, {innerResolvedNss, vector<BSONObj>{}}); + auto graphLookupSpec = fromjson( + "{$graphLookup: {" + " from: 'foo_outer', " + " as: 'x', " + " connectFromField: 'x'," + " connectToField: 'y'," + " startWith: '$start'" + "}}"); + auto pipeline = unittest::assertGet(Pipeline::create( + {DocumentSourceMock::create(), + DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)}, + expCtx)); + + auto involvedNssSet = pipeline->getInvolvedCollections(); + ASSERT_EQ(involvedNssSet.size(), 2UL); + ASSERT(involvedNssSet.find(outerResolvedNss) != involvedNssSet.end()); + ASSERT(involvedNssSet.find(innerResolvedNss) != involvedNssSet.end()); +} + +TEST(InvolvedNamespacesTest, IncludesAllCollectionsWhenResolvingViews) { + boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest()); + const NamespaceString normalCollectionNss{"test", "collection"}; + const NamespaceString lookupNss{"test", "foo"}; + const NamespaceString resolvedNss{"test", "bar"}; + const NamespaceString nssIncludedInResolvedView{"test", "extra_backer_of_bar"}; + expCtx->setResolvedNamespace( + lookupNss, + {resolvedNss, + vector<BSONObj>{ + fromjson("{$lookup: {from: 'extra_backer_of_bar', as: 'x', pipeline: []}}")}}); + expCtx->setResolvedNamespace(nssIncludedInResolvedView, + {nssIncludedInResolvedView, vector<BSONObj>{}}); + expCtx->setResolvedNamespace(normalCollectionNss, {normalCollectionNss, vector<BSONObj>{}}); + auto facetSpec = fromjson( + "{$facet: {" + " pipe_1: [" + " {$lookup: {" + " from: 'foo'," + " as: 'x'," + " localField: 'foo_id'," + " foreignField: '_id'" + " }}" + " ]," + " pipe_2: [" + " {$lookup: {" + " from: 'collection'," + " as: 'z'," + " pipeline: []" + " }}" + " ]" + "}}"); + auto pipeline = unittest::assertGet( + Pipeline::create({DocumentSourceMock::create(), + DocumentSourceFacet::createFromBson(facetSpec.firstElement(), expCtx)}, + expCtx)); + + auto involvedNssSet = pipeline->getInvolvedCollections(); + ASSERT_EQ(involvedNssSet.size(), 3UL); + ASSERT(involvedNssSet.find(resolvedNss) != involvedNssSet.end()); + ASSERT(involvedNssSet.find(nssIncludedInResolvedView) != involvedNssSet.end()); + ASSERT(involvedNssSet.find(normalCollectionNss) != involvedNssSet.end()); +} + } // namespace class All : public Suite { |