summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/sources/out/out_to_referenced_collection.js132
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source.h7
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h2
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp82
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h31
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp28
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp155
12 files changed, 380 insertions, 90 deletions
diff --git a/jstests/aggregation/sources/out/out_to_referenced_collection.js b/jstests/aggregation/sources/out/out_to_referenced_collection.js
new file mode 100644
index 00000000000..a85f90a9816
--- /dev/null
+++ b/jstests/aggregation/sources/out/out_to_referenced_collection.js
@@ -0,0 +1,132 @@
+// Tests that the server behaves as expected when an $out stage is targeting a collection which is
+// involved in the aggregate in some other way, e.g. as the source namespace or via a $lookup. We
+// disallow this combination in an effort to prevent the "halloween problem" of a never-ending
+// query. If the $out is using mode "replaceCollection" then this is legal because we use a
+// temporary collection. If the out is using any other mode which would be "in place", we expect the
+// server to error in an effort to prevent server-side infinite loops.
+// This test issues queries over views, so cannot be run in passthroughs which implicitly shard
+// collections.
+// @tags: [assumes_unsharded_collection]
+(function() {
+ 'use strict';
+
+ load('jstests/aggregation/extras/out_helpers.js'); // For 'withEachOutMode'.
+ load('jstests/libs/fixture_helpers.js'); // For 'FixtureHelpers'.
+
+ const testDB = db.getSiblingDB("out_to_referenced_coll");
+ const coll = testDB.test;
+
+ withEachOutMode(mode => {
+ coll.drop();
+ if (FixtureHelpers.isSharded(coll) && mode === "replaceCollection") {
+ return; // Not a supported combination.
+ }
+
+ // Seed the collection to ensure each pipeline will actually do something.
+ assert.commandWorked(coll.insert({_id: 0}));
+
+ // Each of the following assertions will somehow use $out to write to a namespace that is
+ // being read from elsewhere in the pipeline. This is legal with mode "replaceCollection".
+ const assertFailsWithCode = ((fn) => {
+ const error = assert.throws(fn);
+ assert.contains(error.code, [50992, 51079]);
+ });
+ const asserter = (mode === "replaceCollection") ? assert.doesNotThrow : assertFailsWithCode;
+
+ // Test $out to the aggregate command's source collection.
+ asserter(() => coll.aggregate([{$out: {to: coll.getName(), mode: mode}}]));
+ // Test $out to the same namespace as a $lookup which is the same as the aggregate command's
+ // source collection.
+ asserter(() => coll.aggregate([
+ {$lookup: {from: coll.getName(), as: "x", localField: "f_id", foreignField: "_id"}},
+ {$out: {to: coll.getName(), mode: mode}}
+ ]));
+ // Test $out to the same namespace as a $lookup which is *not* the same as the aggregate
+ // command's source collection.
+ asserter(() => coll.aggregate([
+ {$lookup: {from: "bar", as: "x", localField: "f_id", foreignField: "_id"}},
+ {$out: {to: "bar", mode: mode}}
+ ]));
+ // Test $out to the same namespace as a $graphLookup.
+ asserter(() => coll.aggregate([
+ {
+ $graphLookup: {
+ from: "bar",
+ startWith: "$_id",
+ connectFromField: "_id",
+ connectToField: "parent_id",
+ as: "graph",
+ }
+ },
+ {$out: {to: "bar", mode: mode}}
+ ]));
+ // Test $out to the same namespace as a $lookup which is nested within another $lookup.
+ asserter(() => coll.aggregate([
+ {
+ $lookup: {
+ from: "bar",
+ as: "x",
+ let : {},
+ pipeline: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}]
+ }
+ },
+ {$out: {to: "TARGET", mode: mode}}
+ ]));
+ // Test $out to the same namespace as a $lookup which is nested within a $facet.
+ asserter(() => coll.aggregate([
+ {
+ $facet: {
+ y: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}],
+ }
+ },
+ {$out: {to: "TARGET", mode: mode}}
+ ]));
+ asserter(() => coll.aggregate([
+ {
+ $facet: {
+ x: [{$lookup: {from: "other", as: "y", pipeline: []}}],
+ y: [{$lookup: {from: "TARGET", as: "y", pipeline: []}}],
+ }
+ },
+ {$out: {to: "TARGET", mode: mode}}
+ ]));
+
+ // Test that we use the resolved namespace of a view to detect this sort of halloween
+ // problem.
+ assert.commandWorked(
+ testDB.runCommand({create: "view_on_TARGET", viewOn: "TARGET", pipeline: []}));
+ asserter(() => testDB.view_on_TARGET.aggregate([{$out: {to: "TARGET", mode: mode}}]));
+ asserter(() => coll.aggregate([
+ {
+ $facet: {
+ x: [{$lookup: {from: "other", as: "y", pipeline: []}}],
+ y: [{
+ $lookup: {
+ from: "yet_another",
+ as: "y",
+ pipeline: [{$lookup: {from: "view_on_TARGET", as: "z", pipeline: []}}]
+ }
+ }],
+ }
+ },
+ {$out: {to: "TARGET", mode: mode}}
+ ]));
+
+ function generateNestedPipeline(foreignCollName, numLevels) {
+ let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}];
+
+ for (let level = 1; level < numLevels; level++) {
+ pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}];
+ }
+
+ return pipeline;
+ }
+
+ const nestedPipeline =
+ generateNestedPipeline("lookup", 20).concat([{$out: {to: "lookup", mode: mode}}]);
+ asserter(() => coll.aggregate(nestedPipeline));
+
+ testDB.dropDatabase(); // Clear any of the collections which would be created by the
+ // successful "replaceCollection" mode of this test.
+ });
+}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 8ecefdd485f..3d0ba7fadda 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -295,11 +295,11 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
Status collatorCompatibleWithPipeline(OperationContext* opCtx,
Database* db,
const CollatorInterface* collator,
- const Pipeline* pipeline) {
- if (!db || !pipeline) {
+ const LiteParsedPipeline& liteParsedPipeline) {
+ if (!db) {
return Status::OK();
}
- for (auto&& potentialViewNs : pipeline->getInvolvedCollections()) {
+ for (auto&& potentialViewNs : liteParsedPipeline.getInvolvedNamespaces()) {
if (db->getCollection(opCtx, potentialViewNs)) {
continue;
}
@@ -538,7 +538,7 @@ Status runAggregate(OperationContext* opCtx,
if (!pipelineInvolvedNamespaces.empty()) {
invariant(ctx);
auto pipelineCollationStatus = collatorCompatibleWithPipeline(
- opCtx, ctx->getDb(), expCtx->getCollator(), pipeline.get());
+ opCtx, ctx->getDb(), expCtx->getCollator(), liteParsedPipeline);
if (!pipelineCollationStatus.isOK()) {
return pipelineCollationStatus;
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 8ccec0231c1..1e6efa6dc54 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -298,9 +298,12 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const;
/**
- * If DocumentSource uses additional collections, it adds the namespaces to the input vector.
+ * If this stage uses additional namespaces, adds them to 'collectionNames'. These namespaces
+ * should all be names of collections, not views.
*/
- virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {}
+ virtual void addInvolvedCollections(
+ stdx::unordered_set<NamespaceString>* collectionNames) const {}
+
virtual void detachFromOperationContext() {}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index ab927d4f228..c3085279e8f 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -208,10 +208,11 @@ Value DocumentSourceFacet::serialize(boost::optional<ExplainOptions::Verbosity>
return Value(Document{{"$facet", serialized.freezeToValue()}});
}
-void DocumentSourceFacet::addInvolvedCollections(vector<NamespaceString>* collections) const {
+void DocumentSourceFacet::addInvolvedCollections(
+ stdx::unordered_set<NamespaceString>* collectionNames) const {
for (auto&& facet : _facets) {
for (auto&& source : facet.pipeline->getSources()) {
- source->addInvolvedCollections(collections);
+ source->addInvolvedCollections(collectionNames);
}
}
}
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 6b8f2bcac46..8c82ecf7cb1 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -135,7 +135,7 @@ public:
}
// The following are overridden just to forward calls to sub-pipelines.
- void addInvolvedCollections(std::vector<NamespaceString>* collections) const final;
+ void addInvolvedCollections(stdx::unordered_set<NamespaceString>* involvedNssSet) const final;
void detachFromOperationContext() final;
void reattachToOperationContext(OperationContext* opCtx) final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index ee745dedda6..eea8b370cdf 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -476,7 +476,7 @@ DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
// We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage
// we'll eventually construct from the input document.
_fromPipeline.reserve(_fromPipeline.size() + 1);
- _fromPipeline.push_back(BSONObj());
+ _fromPipeline.push_back(BSON("$match" << BSONObj()));
}
intrusive_ptr<DocumentSourceGraphLookUp> DocumentSourceGraphLookUp::create(
@@ -602,4 +602,13 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
return std::move(newSource);
}
+
+void DocumentSourceGraphLookUp::addInvolvedCollections(
+ stdx::unordered_set<NamespaceString>* collectionNames) const {
+ collectionNames->insert(_fromExpCtx->ns);
+ auto introspectionPipeline = uassertStatusOK(Pipeline::parse(_fromPipeline, _fromExpCtx));
+ for (auto&& stage : introspectionPipeline->getSources()) {
+ stage->addInvolvedCollections(collectionNames);
+ }
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index fb6703c97dd..4077c058770 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -83,9 +83,7 @@ public:
return DepsTracker::State::SEE_NEXT;
};
- void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
- collections->push_back(_from);
- }
+ void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final;
void detachFromOperationContext() final;
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index a900b777073..8a3a97097d1 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -47,39 +47,20 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
-namespace {
-std::string pipelineToString(const vector<BSONObj>& pipeline) {
- StringBuilder sb;
- sb << "[";
-
- auto first = true;
- for (auto& stageSpec : pipeline) {
- if (!first) {
- sb << ", ";
- } else {
- first = false;
- }
- sb << stageSpec;
- }
- sb << "]";
- return sb.str();
-}
-} // namespace
-
constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth;
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx),
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx),
_fromNs(std::move(fromNs)),
_as(std::move(as)),
- _variables(pExpCtx->variables),
- _variablesParseState(pExpCtx->variablesParseState.copyWith(_variables.useIdGenerator())) {
- const auto& resolvedNamespace = pExpCtx->getResolvedNamespace(_fromNs);
+ _variables(expCtx->variables),
+ _variablesParseState(expCtx->variablesParseState.copyWith(_variables.useIdGenerator())) {
+ const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs);
_resolvedNs = resolvedNamespace.ns;
_resolvedPipeline = resolvedNamespace.pipeline;
- _fromExpCtx = pExpCtx->copyWith(_resolvedNs);
+ _fromExpCtx = expCtx->copyWith(_resolvedNs);
_fromExpCtx->subPipelineDepth += 1;
uassert(ErrorCodes::MaxSubPipelineDepthExceeded,
@@ -92,22 +73,23 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::string localField,
std::string foreignField,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceLookUp(fromNs, as, pExpCtx) {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceLookUp(fromNs, as, expCtx) {
_localField = std::move(localField);
_foreignField = std::move(foreignField);
// We append an additional BSONObj to '_resolvedPipeline' as a placeholder for the $match stage
// we'll eventually construct from the input document.
_resolvedPipeline.reserve(_resolvedPipeline.size() + 1);
- _resolvedPipeline.push_back(BSONObj());
+ _resolvedPipeline.push_back(BSON("$match" << BSONObj()));
+ initializeResolvedIntrospectionPipeline();
}
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
std::vector<BSONObj> pipeline,
BSONObj letVariables,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceLookUp(fromNs, as, pExpCtx) {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceLookUp(fromNs, as, expCtx) {
// '_resolvedPipeline' will first be initialized by the constructor delegated to within this
// constructor's initializer list. It will be populated with view pipeline prefix if 'fromNs'
// represents a view. We append the user 'pipeline' to the end of '_resolvedPipeline' to ensure
@@ -124,11 +106,11 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
_letVariables.emplace_back(
varName.toString(),
- Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState),
+ Expression::parseOperand(expCtx, varElem, expCtx->variablesParseState),
_variablesParseState.defineVariable(varName));
}
- initializeIntrospectionPipeline();
+ initializeResolvedIntrospectionPipeline();
}
std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
@@ -190,7 +172,7 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
// transaction requirement from the children in its pipeline.
if (wasConstructedWithPipelineSyntax()) {
const auto resolvedRequirements = StageConstraints::resolveDiskUseAndTransactionRequirement(
- _parsedIntrospectionPipeline->getSources());
+ _resolvedIntrospectionPipeline->getSources());
diskRequirement = resolvedRequirements.first;
txnRequirement = resolvedRequirements.second;
}
@@ -465,14 +447,6 @@ Pipeline::SourceContainer::iterator DocumentSourceLookUp::doOptimizeAt(
return itr;
}
-std::string DocumentSourceLookUp::getUserPipelineDefinition() {
- if (wasConstructedWithPipelineSyntax()) {
- return pipelineToString(_userPipeline);
- }
-
- return _resolvedPipeline.back().toString();
-}
-
bool DocumentSourceLookUp::usedDisk() {
if (_pipeline)
_usedDisk = _usedDisk || _pipeline->usedDisk();
@@ -654,11 +628,12 @@ void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variabl
}
}
-void DocumentSourceLookUp::initializeIntrospectionPipeline() {
+void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() {
copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
- _parsedIntrospectionPipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+ _resolvedIntrospectionPipeline =
+ uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
- auto& sources = _parsedIntrospectionPipeline->getSources();
+ auto& sources = _resolvedIntrospectionPipeline->getSources();
// Ensure that the pipeline does not contain a $changeStream stage. This check will be
// performed recursively on all sub-pipelines.
@@ -685,7 +660,7 @@ void DocumentSourceLookUp::serializeToArray(
auto pipeline = _userPipeline;
if (_additionalFilter) {
- pipeline.push_back(BSON("$match" << *_additionalFilter));
+ pipeline.emplace_back(BSON("$match" << *_additionalFilter));
}
doc = Document{{getSourceName(),
@@ -741,7 +716,7 @@ void DocumentSourceLookUp::serializeToArray(
DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
if (wasConstructedWithPipelineSyntax()) {
// We will use the introspection pipeline which we prebuilt during construction.
- invariant(_parsedIntrospectionPipeline);
+ invariant(_resolvedIntrospectionPipeline);
// We are not attempting to enforce that any referenced metadata are in fact available,
// this is done elsewhere. We only need to know what variable dependencies exist in the
@@ -752,7 +727,7 @@ DepsTracker::State DocumentSourceLookUp::getDependencies(DepsTracker* deps) cons
// Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
// declared by this $lookup and variables declared externally.
- for (auto&& source : _parsedIntrospectionPipeline->getSources()) {
+ for (auto&& source : _resolvedIntrospectionPipeline->getSources()) {
source->getDependencies(&subDeps);
}
@@ -839,7 +814,9 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
}
uassert(ErrorCodes::FailedToParse,
- str::stream() << "$lookup argument '" << argument << "' must be a string, is type "
+ str::stream() << "$lookup argument '" << argName << "' must be a string, found "
+ << argument
+ << ": "
<< argument.type(),
argument.type() == BSONType::String);
@@ -887,4 +864,13 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
pExpCtx);
}
}
+
+void DocumentSourceLookUp::addInvolvedCollections(
+ stdx::unordered_set<NamespaceString>* collectionNames) const {
+ collectionNames->insert(_resolvedNs);
+ for (auto&& stage : _resolvedIntrospectionPipeline->getSources()) {
+ stage->addInvolvedCollections(collectionNames);
+ }
}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index a657c022b44..dc233632496 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -126,14 +126,7 @@ public:
return MergingLogic{nullptr, this, boost::none};
}
- void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
- collections->push_back(_fromNs);
- if (_parsedIntrospectionPipeline) {
- for (auto&& stage : _parsedIntrospectionPipeline->getSources()) {
- stage->addInvolvedCollections(collections);
- }
- }
- }
+ void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final;
void detachFromOperationContext() final;
@@ -142,13 +135,13 @@ public:
bool usedDisk() final;
static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
static boost::intrusive_ptr<DocumentSource> createFromBsonWithCacheSize(
BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
size_t maxCacheSizeBytes) {
- auto dsLookup = createFromBson(elem, pExpCtx);
+ auto dsLookup = createFromBson(elem, expCtx);
static_cast<DocumentSourceLookUp*>(dsLookup.get())->reInitializeCache(maxCacheSizeBytes);
return dsLookup;
}
@@ -215,7 +208,7 @@ private:
*/
DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Constructor used for a $lookup stage specified using the {from: ..., localField: ...,
@@ -225,7 +218,7 @@ private:
std::string as,
std::string localField,
std::string foreignField,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Constructor used for a $lookup stage specified using the {from: ..., pipeline: [...], as:
@@ -235,7 +228,7 @@ private:
std::string as,
std::vector<BSONObj> pipeline,
BSONObj letVariables,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Should not be called; use serializeToArray instead.
@@ -262,7 +255,7 @@ private:
* Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup
* pipelines will be built recursively.
*/
- void initializeIntrospectionPipeline();
+ void initializeResolvedIntrospectionPipeline();
/**
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
@@ -271,12 +264,6 @@ private:
std::unique_ptr<Pipeline, PipelineDeleter> buildPipeline(const Document& inputDoc);
/**
- * The pipeline supplied via the $lookup 'pipeline' argument. This may differ from pipeline that
- * is executed in that it will not include optimizations or resolved views.
- */
- std::string getUserPipelineDefinition();
-
- /**
* Reinitialize the cache with a new max size. May only be called if this DSLookup was created
* with pipeline syntax, the cache has not been frozen or abandoned, and no data has been added
* to it.
@@ -321,7 +308,7 @@ private:
std::vector<BSONObj> _userPipeline;
// A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective
// functions. If sub-$lookup stages are present, their pipelines are constructed recursively.
- std::unique_ptr<Pipeline, PipelineDeleter> _parsedIntrospectionPipeline;
+ std::unique_ptr<Pipeline, PipelineDeleter> _resolvedIntrospectionPipeline;
std::vector<LetVariable> _letVariables;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 80c175e3f73..677daa986f8 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_out_in_place.h"
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
@@ -158,7 +159,9 @@ void Pipeline::validateTopLevelPipeline() const {
uasserted(ErrorCodes::InvalidNamespace,
"{aggregate: 1} is not valid for an empty pipeline.");
}
- } else if ("$mergeCursors"_sd != _sources.front()->getSourceName()) {
+ return;
+ }
+ if ("$mergeCursors"_sd != _sources.front()->getSourceName()) {
// The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
const auto firstStageConstraints = _sources.front()->constraints(_splitState);
@@ -189,6 +192,21 @@ void Pipeline::validateTopLevelPipeline() const {
}
}
}
+ // Make sure we aren't reading and writing to the same namespace for an $out. This is
+ // allowed for mode "replaceCollection", but not for the in-place modes.
+ if (auto outStage = dynamic_cast<DocumentSourceOutInPlace*>(_sources.back().get())) {
+ const auto& outNss = outStage->getOutputNs();
+ stdx::unordered_set<NamespaceString> collectionNames;
+ // In order to gather only the involved namespaces which we are reading to, not the one we
+ // are writing from, skip the final stage as we know it is an $out stage.
+ for (auto it = _sources.begin(); it != std::prev(_sources.end()); it++) {
+ (*it)->addInvolvedCollections(&collectionNames);
+ }
+ uassert(51079,
+ "Cannot use $out to write to the same namespace being read from elsewhere in the "
+ "pipeline unless $out's mode is \"replaceCollection\"",
+ collectionNames.find(outNss) == collectionNames.end());
+ }
}
void Pipeline::validateFacetPipeline() const {
@@ -412,12 +430,12 @@ bool Pipeline::requiredToRunOnMongos() const {
return false;
}
-std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
- std::vector<NamespaceString> collections;
+stdx::unordered_set<NamespaceString> Pipeline::getInvolvedCollections() const {
+ stdx::unordered_set<NamespaceString> collectionNames;
for (auto&& source : _sources) {
- source->addInvolvedCollections(&collections);
+ source->addInvolvedCollections(&collectionNames);
}
- return collections;
+ return collectionNames;
}
vector<Value> Pipeline::serialize() const {
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 7c06f4adbd0..849143dd94e 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -233,9 +233,10 @@ public:
/**
* Returns any other collections involved in the pipeline in addition to the collection the
- * aggregation is run on.
+ * aggregation is run on. All namespaces returned are the names of collections, after views have
+ * been resolved.
*/
- std::vector<NamespaceString> getInvolvedCollections() const;
+ stdx::unordered_set<NamespaceString> getInvolvedCollections() const;
/**
* Serializes the pipeline into a form that can be parsed into an equivalent pipeline.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 963ead6ec2e..0c3d87617c1 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -39,7 +39,10 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_facet.h"
+#include "mongo/db/pipeline/document_source_graph_lookup.h"
#include "mongo/db/pipeline/document_source_internal_split_pipeline.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -3034,6 +3037,158 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) {
ASSERT_EQ(nameMap["a"], "a");
}
+TEST(InvolvedNamespacesTest, NoInvolvedNamespacesForMatchSortProject) {
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(),
+ DocumentSourceMatch::create(BSON("x" << 1), expCtx),
+ DocumentSourceSort::create(expCtx, BSON("y" << -1)),
+ DocumentSourceProject::create(BSON("x" << 1 << "y" << 1), expCtx)},
+ expCtx));
+ auto involvedNssSet = pipeline->getInvolvedCollections();
+ ASSERT(involvedNssSet.empty());
+}
+
+TEST(InvolvedNamespacesTest, IncludesLookupNamespace) {
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const NamespaceString lookupNss{"test", "foo"};
+ const NamespaceString resolvedNss{"test", "bar"};
+ expCtx->setResolvedNamespace(lookupNss, {resolvedNss, vector<BSONObj>{}});
+ auto lookupSpec =
+ fromjson("{$lookup: {from: 'foo', as: 'x', localField: 'foo_id', foreignField: '_id'}}");
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(),
+ DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)},
+ expCtx));
+
+ auto involvedNssSet = pipeline->getInvolvedCollections();
+ ASSERT_EQ(involvedNssSet.size(), 1UL);
+ ASSERT(involvedNssSet.find(resolvedNss) != involvedNssSet.end());
+}
+
+TEST(InvolvedNamespacesTest, IncludesGraphLookupNamespace) {
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const NamespaceString lookupNss{"test", "foo"};
+ const NamespaceString resolvedNss{"test", "bar"};
+ expCtx->setResolvedNamespace(lookupNss, {resolvedNss, vector<BSONObj>{}});
+ auto graphLookupSpec = fromjson(
+ "{$graphLookup: {"
+ " from: 'foo',"
+ " as: 'x',"
+ " connectFromField: 'x',"
+ " connectToField: 'y',"
+ " startWith: '$start'"
+ "}}");
+ auto pipeline = unittest::assertGet(Pipeline::create(
+ {DocumentSourceMock::create(),
+ DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)},
+ expCtx));
+
+ auto involvedNssSet = pipeline->getInvolvedCollections();
+ ASSERT_EQ(involvedNssSet.size(), 1UL);
+ ASSERT(involvedNssSet.find(resolvedNss) != involvedNssSet.end());
+}
+
+TEST(InvolvedNamespacesTest, IncludesLookupSubpipelineNamespaces) {
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const NamespaceString outerLookupNss{"test", "foo_outer"};
+ const NamespaceString outerResolvedNss{"test", "bar_outer"};
+ const NamespaceString innerLookupNss{"test", "foo_inner"};
+ const NamespaceString innerResolvedNss{"test", "bar_inner"};
+ expCtx->setResolvedNamespace(outerLookupNss, {outerResolvedNss, vector<BSONObj>{}});
+ expCtx->setResolvedNamespace(innerLookupNss, {innerResolvedNss, vector<BSONObj>{}});
+ auto lookupSpec = fromjson(
+ "{$lookup: {"
+ " from: 'foo_outer', "
+ " as: 'x', "
+ " pipeline: [{$lookup: {from: 'foo_inner', as: 'y', pipeline: []}}]"
+ "}}");
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(),
+ DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)},
+ expCtx));
+
+ auto involvedNssSet = pipeline->getInvolvedCollections();
+ ASSERT_EQ(involvedNssSet.size(), 2UL);
+ ASSERT(involvedNssSet.find(outerResolvedNss) != involvedNssSet.end());
+ ASSERT(involvedNssSet.find(innerResolvedNss) != involvedNssSet.end());
+}
+
+TEST(InvolvedNamespacesTest, IncludesGraphLookupSubPipeline) {
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const NamespaceString outerLookupNss{"test", "foo_outer"};
+ const NamespaceString outerResolvedNss{"test", "bar_outer"};
+ const NamespaceString innerLookupNss{"test", "foo_inner"};
+ const NamespaceString innerResolvedNss{"test", "bar_inner"};
+ expCtx->setResolvedNamespace(outerLookupNss, {outerResolvedNss, vector<BSONObj>{}});
+ expCtx->setResolvedNamespace(
+ outerLookupNss,
+ {outerResolvedNss,
+ vector<BSONObj>{fromjson("{$lookup: {from: 'foo_inner', as: 'x', pipeline: []}}")}});
+ expCtx->setResolvedNamespace(innerLookupNss, {innerResolvedNss, vector<BSONObj>{}});
+ auto graphLookupSpec = fromjson(
+ "{$graphLookup: {"
+ " from: 'foo_outer', "
+ " as: 'x', "
+ " connectFromField: 'x',"
+ " connectToField: 'y',"
+ " startWith: '$start'"
+ "}}");
+ auto pipeline = unittest::assertGet(Pipeline::create(
+ {DocumentSourceMock::create(),
+ DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)},
+ expCtx));
+
+ auto involvedNssSet = pipeline->getInvolvedCollections();
+ ASSERT_EQ(involvedNssSet.size(), 2UL);
+ ASSERT(involvedNssSet.find(outerResolvedNss) != involvedNssSet.end());
+ ASSERT(involvedNssSet.find(innerResolvedNss) != involvedNssSet.end());
+}
+
+TEST(InvolvedNamespacesTest, IncludesAllCollectionsWhenResolvingViews) {
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+ const NamespaceString normalCollectionNss{"test", "collection"};
+ const NamespaceString lookupNss{"test", "foo"};
+ const NamespaceString resolvedNss{"test", "bar"};
+ const NamespaceString nssIncludedInResolvedView{"test", "extra_backer_of_bar"};
+ expCtx->setResolvedNamespace(
+ lookupNss,
+ {resolvedNss,
+ vector<BSONObj>{
+ fromjson("{$lookup: {from: 'extra_backer_of_bar', as: 'x', pipeline: []}}")}});
+ expCtx->setResolvedNamespace(nssIncludedInResolvedView,
+ {nssIncludedInResolvedView, vector<BSONObj>{}});
+ expCtx->setResolvedNamespace(normalCollectionNss, {normalCollectionNss, vector<BSONObj>{}});
+ auto facetSpec = fromjson(
+ "{$facet: {"
+ " pipe_1: ["
+ " {$lookup: {"
+ " from: 'foo',"
+ " as: 'x',"
+ " localField: 'foo_id',"
+ " foreignField: '_id'"
+ " }}"
+ " ],"
+ " pipe_2: ["
+ " {$lookup: {"
+ " from: 'collection',"
+ " as: 'z',"
+ " pipeline: []"
+ " }}"
+ " ]"
+ "}}");
+ auto pipeline = unittest::assertGet(
+ Pipeline::create({DocumentSourceMock::create(),
+ DocumentSourceFacet::createFromBson(facetSpec.firstElement(), expCtx)},
+ expCtx));
+
+ auto involvedNssSet = pipeline->getInvolvedCollections();
+ ASSERT_EQ(involvedNssSet.size(), 3UL);
+ ASSERT(involvedNssSet.find(resolvedNss) != involvedNssSet.end());
+ ASSERT(involvedNssSet.find(nssIncludedInResolvedView) != involvedNssSet.end());
+ ASSERT(involvedNssSet.find(normalCollectionNss) != involvedNssSet.end());
+}
+
} // namespace
class All : public Suite {