summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyle Suarez <kyle.suarez@mongodb.com>2018-09-14 12:17:07 -0400
committerKyle Suarez <kyle.suarez@mongodb.com>2018-09-14 13:51:15 -0400
commit7e8486ba84837a548f2f1470209eb204a42c293a (patch)
treea867539db74beba9368012efb0229d44f05d6f7b
parent8f9cf06033d7b1e0942c76eecfb69b5eee044ed6 (diff)
downloadmongo-7e8486ba84837a548f2f1470209eb204a42c293a.tar.gz
SERVER-35419 $lookup and $facet must inherit constraints from children
By default, $lookup and $facet do not write persistent data and are allowed in a transaction. However, both stages must inherit the "strictest" disk use requirement of any stage in their sub-pipelines, and can only be used in a transaction if each of those pipelines contain only transaction-compatible stages.
-rw-r--r--jstests/core/views/views_creation.js16
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.h212
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h2
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp104
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h25
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp44
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp2
-rw-r--r--src/mongo/db/pipeline/stage_constraints.cpp36
-rw-r--r--src/mongo/db/pipeline/stage_constraints.h289
15 files changed, 538 insertions, 258 deletions
diff --git a/jstests/core/views/views_creation.js b/jstests/core/views/views_creation.js
index 68c896118a5..b320fd711ae 100644
--- a/jstests/core/views/views_creation.js
+++ b/jstests/core/views/views_creation.js
@@ -77,12 +77,24 @@
assert.commandFailedWithCode(
viewsDB.runCommand({create: "dollar$", viewOn: "collection", pipeline: pipe}),
ErrorCodes.InvalidNamespace);
+
+ // You cannot create a view with a $out stage, by itself or nested inside of a different stage.
+ const outStage = {$out: "nonExistentCollection"};
+ assert.commandFailedWithCode(
+ viewsDB.runCommand({create: "viewWithOut", viewOn: "collection", pipeline: [outStage]}),
+ ErrorCodes.OptionNotSupportedOnView);
assert.commandFailedWithCode(viewsDB.runCommand({
- create: "viewWithBadPipeline",
+ create: "viewWithOutInLookup",
viewOn: "collection",
- pipeline: [{$project: {_id: false}}, {$out: "notExistingCollection"}]
+ pipeline: [{$lookup: {from: "other", pipeline: [outStage], as: "result"}}]
}),
ErrorCodes.OptionNotSupportedOnView);
+ assert.commandFailedWithCode(viewsDB.runCommand({
+ create: "viewWithOutInFacet",
+ viewOn: "collection",
+ pipeline: [{$facet: {output: [outStage]}}]
+ }),
+ 40600);
// These test that, when an existing view in system.views is invalid because of a $out in the
// pipeline, the database errors on creation of a new view.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 40d1afff528..06c490a90d1 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -354,6 +354,7 @@ pipelineeEnv.Library(
'document_source_unwind.cpp',
'pipeline.cpp',
'sequential_document_cache.cpp',
+ 'stage_constraints.cpp',
'tee_buffer.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index e9bf60dba55..ea11eea4cc3 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -50,6 +50,7 @@
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/stage_constraints.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/stdx/functional.h"
@@ -130,217 +131,6 @@ public:
using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
- /**
- * A struct describing various constraints about where this stage can run, where it must be in
- * the pipeline, what resources it may require, etc.
- */
- struct StageConstraints {
- /**
- * A StreamType defines whether this stage is streaming (can produce output based solely on
- * the current input document) or blocking (must examine subsequent documents before
- * producing an output document).
- */
- enum class StreamType { kStreaming, kBlocking };
-
- /**
- * A PositionRequirement stipulates what specific position the stage must occupy within the
- * pipeline, if any.
- */
- enum class PositionRequirement { kNone, kFirst, kLast };
-
- /**
- * A HostTypeRequirement defines where this stage is permitted to be executed when the
- * pipeline is run on a sharded cluster.
- */
- enum class HostTypeRequirement {
- // Indicates that the stage can run either on mongoD or mongoS.
- kNone,
- // Indicates that the stage must run on the host to which it was originally sent and
- // cannot be forwarded from mongoS to the shards.
- kLocalOnly,
- // Indicates that the stage must run on the primary shard.
- kPrimaryShard,
- // Indicates that the stage must run on any participating shard.
- kAnyShard,
- // Indicates that the stage can only run on mongoS.
- kMongoS,
- };
-
- /**
- * A DiskUseRequirement indicates whether this stage writes permanent data to disk, or
- * whether it may spill temporary data to disk if its memory usage exceeds a given
- * threshold. Note that this only indicates that the stage has the ability to spill; if
- * 'allowDiskUse' is set to false, it will be prevented from doing so.
- */
- enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
-
- /**
- * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream
- * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is
- * blacklisted from $changeStream.
- */
- enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist };
-
- /**
- * A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
- */
- enum class FacetRequirement { kAllowed, kNotAllowed };
-
- /**
- * Indicates whether or not this stage is legal when the read concern for the aggregate has
- * readConcern level "snapshot" or is running inside of a multi-document transaction.
- */
- enum class TransactionRequirement { kNotAllowed, kAllowed };
-
- StageConstraints(
- StreamType streamType,
- PositionRequirement requiredPosition,
- HostTypeRequirement hostRequirement,
- DiskUseRequirement diskRequirement,
- FacetRequirement facetRequirement,
- TransactionRequirement transactionRequirement,
- ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
- : requiredPosition(requiredPosition),
- hostRequirement(hostRequirement),
- diskRequirement(diskRequirement),
- changeStreamRequirement(changeStreamRequirement),
- facetRequirement(facetRequirement),
- transactionRequirement(transactionRequirement),
- streamType(streamType) {
- // Stages which are allowed to run in $facet must not have any position requirements.
- invariant(
- !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone));
-
- // No change stream stages are permitted to run in a $facet pipeline.
- invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage()));
-
- // Only streaming stages are permitted in $changeStream pipelines.
- invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking));
-
- // A stage which is whitelisted for $changeStream cannot have a requirement to run on a
- // shard, since it needs to be able to run on mongoS in a cluster.
- invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
- (hostRequirement == HostTypeRequirement::kAnyShard ||
- hostRequirement == HostTypeRequirement::kPrimaryShard)));
-
- // A stage which is whitelisted for $changeStream cannot have a position requirement.
- invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
- requiredPosition != PositionRequirement::kNone));
-
- // Change stream stages should not be permitted with readConcern level "snapshot" or
- // inside of a multi-document transaction.
- if (isChangeStreamStage()) {
- invariant(!isAllowedInTransaction());
- }
-
- // Stages which write data to user collections should not be permitted with readConcern
- // level "snapshot" or inside of a multi-document transaction.
- if (diskRequirement == DiskUseRequirement::kWritesPersistentData) {
- invariant(!isAllowedInTransaction());
- }
- }
-
- /**
- * Returns the literal HostTypeRequirement used to initialize the StageConstraints, or the
- * effective HostTypeRequirement (kAnyShard or kMongoS) if kLocalOnly was specified.
- */
- HostTypeRequirement resolvedHostTypeRequirement(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
- return (hostRequirement != HostTypeRequirement::kLocalOnly
- ? hostRequirement
- : (expCtx->inMongos ? HostTypeRequirement::kMongoS
- : HostTypeRequirement::kAnyShard));
- }
-
- /**
- * True if this stage must run on the same host to which it was originally sent.
- */
- bool mustRunLocally() const {
- return hostRequirement == HostTypeRequirement::kLocalOnly;
- }
-
- /**
- * True if this stage is permitted to run in a $facet pipeline.
- */
- bool isAllowedInsideFacetStage() const {
- return facetRequirement == FacetRequirement::kAllowed;
- }
-
- /**
- * True if this stage is permitted to run in a pipeline which starts with $changeStream.
- */
- bool isAllowedInChangeStream() const {
- return changeStreamRequirement != ChangeStreamRequirement::kBlacklist;
- }
-
- /**
- * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed
- * to run in a pipeline which begins with $changeStream.
- */
- bool isChangeStreamStage() const {
- return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
- }
-
- /**
- * Returns true if this stage is legal when the readConcern level is "snapshot" or when this
- * aggregation is being run within a multi-document transaction.
- */
- bool isAllowedInTransaction() const {
- return transactionRequirement == TransactionRequirement::kAllowed;
- }
-
- /**
- * Returns true if this stage writes persistent data to disk.
- */
- bool writesPersistentData() const {
- return diskRequirement == DiskUseRequirement::kWritesPersistentData;
- }
-
- // Indicates whether this stage needs to be at a particular position in the pipeline.
- const PositionRequirement requiredPosition;
-
- // Indicates whether this stage can only be executed on specific components of a sharded
- // cluster.
- const HostTypeRequirement hostRequirement;
-
- // Indicates whether this stage may write persistent data to disk, or may spill to temporary
- // files if its memory usage becomes excessive.
- const DiskUseRequirement diskRequirement;
-
- // Indicates whether this stage is itself a $changeStream stage, or if not whether it may
- // exist in a pipeline which begins with $changeStream.
- const ChangeStreamRequirement changeStreamRequirement;
-
- // Indicates whether this stage may run inside a $facet stage.
- const FacetRequirement facetRequirement;
-
- // Indicates whether this stage is legal when the readConcern level is "snapshot" or the
- // aggregate is running inside of a multi-document transaction.
- const TransactionRequirement transactionRequirement;
-
- // Indicates whether this is a streaming or blocking stage.
- const StreamType streamType;
-
- // True if this stage does not generate results itself, and instead pulls inputs from an
- // input DocumentSource (via 'pSource').
- bool requiresInputDocSource = true;
-
- // True if this stage operates on a global or database level, like $currentOp.
- bool isIndependentOfAnyCollection = false;
-
- // True if this stage can ever be safely swapped with a subsequent $match stage, provided
- // that the match does not depend on the paths returned by getModifiedPaths().
- //
- // Stages that want to participate in match swapping should set this to true. Such a stage
- // must also override getModifiedPaths() to provide information about which particular
- // $match predicates be swapped before itself.
- bool canSwapWithMatch = false;
-
- // True if a subsequent $limit stage can be moved before this stage in the pipeline. This is
- // true if this stage does not add or remove documents from the pipeline.
- bool canSwapWithLimit = false;
- };
-
using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
using PositionRequirement = StageConstraints::PositionRequirement;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 5ad06bd9329..2e452b70800 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -112,9 +112,7 @@ const char* DocumentSourceOplogMatch::getSourceName() const {
return DocumentSourceChangeStream::kStageName.rawData();
}
-DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints(
- Pipeline::SplitState pipeState) const {
-
+StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipeState) const {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 9e49dedcd1f..cb86e61b65a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -117,7 +117,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
}
}
-DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints(
+StageConstraints DocumentSourceChangeStreamTransform::constraints(
Pipeline::SplitState pipeState) const {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index d0b2450fb91..c569b625d0b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -51,7 +51,7 @@ public:
DocumentSource::GetModPathsReturn getModifiedPaths() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const;
- DocumentSource::StageConstraints constraints(Pipeline::SplitState pipeState) const final;
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final;
DocumentSource::GetNextResult getNext();
const char* getSourceName() const {
return DocumentSourceChangeStream::kStageName.rawData();
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 4f2a2dd8547..55efe9538fd 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -234,15 +234,7 @@ void DocumentSourceFacet::reattachToOperationContext(OperationContext* opCtx) {
}
}
-DocumentSource::StageConstraints DocumentSourceFacet::constraints(
- Pipeline::SplitState pipeState) const {
- const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
- const auto sources = facet.pipeline->getSources();
- return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
- return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData;
- });
- });
-
+StageConstraints DocumentSourceFacet::constraints(Pipeline::SplitState) const {
// Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
// This means that if any stage in any of the $facet pipelines needs to run on the primary shard
// or on mongoS, then the entire $facet stage must run there.
@@ -266,12 +258,20 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints(
}
}
+ // Resolve the disk use and transaction requirement of this $facet by iterating through the
+ // children in its facets.
+ auto diskAndTxnReq = StageConstraints::kDefaultDiskUseAndTransactionRequirement;
+ for (const auto& facet : _facets) {
+ diskAndTxnReq = StageConstraints::resolveDiskUseAndTransactionRequirement(
+ facet.pipeline->getSources(), diskAndTxnReq);
+ }
+
return {StreamType::kBlocking,
PositionRequirement::kNone,
host,
- mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
+ std::get<StageConstraints::DiskUseRequirement>(diskAndTxnReq),
FacetRequirement::kNotAllowed,
- TransactionRequirement::kAllowed};
+ std::get<StageConstraints::TransactionRequirement>(diskAndTxnReq)};
}
bool DocumentSourceFacet::usedDisk() {
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 9a367f14b24..6a7e52c338b 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -229,6 +229,55 @@ public:
}
};
+TEST_F(DocumentSourceFacetTest, PassthroughFacetDoesntRequireDiskAndIsOKInaTxn) {
+ auto ctx = getExpCtx();
+ auto passthrough = DocumentSourcePassthrough::create();
+ auto passthroughPipe = uassertStatusOK(Pipeline::createFacetPipeline({passthrough}, ctx));
+
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("passthrough", std::move(passthroughPipe));
+
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ DocumentSource::DiskUseRequirement::kNoDiskUse);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ DocumentSource::TransactionRequirement::kAllowed);
+}
+
+/**
+ * A dummy DocumentSource which writes persistent data.
+ */
+class DocumentSourceWritesPersistentData final : public DocumentSourcePassthrough {
+public:
+ StageConstraints constraints(Pipeline::SplitState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesPersistentData,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kNotAllowed};
+ }
+
+ static boost::intrusive_ptr<DocumentSourceWritesPersistentData> create() {
+ return new DocumentSourceWritesPersistentData();
+ }
+};
+
+TEST_F(DocumentSourceFacetTest, FacetWithChildThatWritesDataAlsoReportsWritingData) {
+ auto ctx = getExpCtx();
+ auto writesDataStage = DocumentSourceWritesPersistentData::create();
+ auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({writesDataStage}, ctx));
+
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("writes", std::move(pipeline));
+
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ DocumentSource::DiskUseRequirement::kWritesPersistentData);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ DocumentSource::TransactionRequirement::kNotAllowed);
+}
+
TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) {
auto ctx = getExpCtx();
@@ -697,7 +746,11 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement ==
- DocumentSource::StageConstraints::HostTypeRequirement::kPrimaryShard);
+ StageConstraints::HostTypeRequirement::kPrimaryShard);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ StageConstraints::DiskUseRequirement::kNoDiskUse);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ StageConstraints::TransactionRequirement::kAllowed);
}
TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) {
@@ -717,8 +770,55 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement ==
- DocumentSource::StageConstraints::HostTypeRequirement::kNone);
+ StageConstraints::HostTypeRequirement::kNone);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ StageConstraints::DiskUseRequirement::kNoDiskUse);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ StageConstraints::TransactionRequirement::kAllowed);
}
+/**
+ * A dummy DocumentSource that must run on the primary shard, can write temporary data and can't be
+ * used in a transaction.
+ */
+class DocumentSourcePrimaryShardTmpDataNoTxn final : public DocumentSourcePassthrough {
+public:
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kNotAllowed};
+ }
+
+ static boost::intrusive_ptr<DocumentSourcePrimaryShardTmpDataNoTxn> create() {
+ return new DocumentSourcePrimaryShardTmpDataNoTxn();
+ }
+};
+
+TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstraint) {
+ auto ctx = getExpCtx();
+
+ auto firstPassthrough = DocumentSourcePassthrough::create();
+ auto firstPipeline =
+ unittest::assertGet(Pipeline::createFacetPipeline({firstPassthrough}, ctx));
+
+ auto secondPassthrough = DocumentSourcePrimaryShardTmpDataNoTxn::create();
+ auto secondPipeline =
+ unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx));
+
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("first", std::move(firstPipeline));
+ facets.emplace_back("second", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
+
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement ==
+ StageConstraints::HostTypeRequirement::kPrimaryShard);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ StageConstraints::DiskUseRequirement::kWritesTmpData);
+ ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ StageConstraints::TransactionRequirement::kNotAllowed);
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 16e20491f04..4762916ee3a 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -180,6 +180,31 @@ const char* DocumentSourceLookUp::getSourceName() const {
return "$lookup";
}
+StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
+ // By default, $lookup is allowed in a transaction and does not use disk.
+ auto diskRequirement = DiskUseRequirement::kNoDiskUse;
+ auto txnRequirement = TransactionRequirement::kAllowed;
+
+ // However, if $lookup is specified with a pipeline, it inherits the strictest disk use and
+ // transaction requirement from the children in its pipeline.
+ if (wasConstructedWithPipelineSyntax()) {
+ const auto resolvedRequirements = StageConstraints::resolveDiskUseAndTransactionRequirement(
+ _parsedIntrospectionPipeline->getSources());
+ diskRequirement = resolvedRequirements.first;
+ txnRequirement = resolvedRequirements.second;
+ }
+
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ diskRequirement,
+ FacetRequirement::kAllowed,
+ txnRequirement);
+
+ constraints.canSwapWithMatch = true;
+ return constraints;
+}
+
namespace {
/**
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 5276aa917c4..de12c821443 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -103,26 +103,11 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
- std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
- _parsedIntrospectionPipeline->getSources().end(),
- [](const auto& source) {
- return source->constraints().diskRequirement ==
- DiskUseRequirement::kWritesTmpData;
- });
-
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kPrimaryShard,
- mayUseDisk ? DiskUseRequirement::kWritesTmpData
- : DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed,
- TransactionRequirement::kAllowed);
-
- constraints.canSwapWithMatch = true;
- return constraints;
- }
+ /**
+ * Reports the StageConstraints of this $lookup instance. A $lookup constructed with pipeline
+ * syntax will inherit certain constraints from the stages in its pipeline.
+ */
+ StageConstraints constraints(Pipeline::SplitState) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 00c0baeecdb..f516626a47e 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -193,6 +193,50 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) {
ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax());
}
+TEST_F(DocumentSourceLookUpTest, LookupEmptyPipelineDoesntUseDiskAndIsOKInATransaction) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace_forTest(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource = DocumentSourceLookUp::createFromBson(
+ BSON("$lookup" << BSON("from" << fromNs.coll().toString() << "pipeline" << BSONArray()
+ << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+ auto lookup = static_cast<DocumentSourceLookUp*>(docSource.get());
+
+ ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax());
+ ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ DocumentSource::DiskUseRequirement::kNoDiskUse);
+ ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ DocumentSource::TransactionRequirement::kAllowed);
+}
+
+TEST_F(DocumentSourceLookUpTest, LookupWithChildStagesInheritsDiskUseRequirement) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespace_forTest(fromNs, {fromNs, std::vector<BSONObj>{}});
+
+ auto docSource =
+ DocumentSourceLookUp::createFromBson(BSON("$lookup" << BSON("from"
+ << "coll"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$out"
+ << "target"))
+ << "as"
+ << "as"))
+ .firstElement(),
+ expCtx);
+ auto lookup = static_cast<DocumentSourceLookUp*>(docSource.get());
+
+ ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax());
+ ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).diskRequirement ==
+ DocumentSource::DiskUseRequirement::kWritesPersistentData);
+ ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement ==
+ DocumentSource::TransactionRequirement::kNotAllowed);
+}
+
TEST_F(DocumentSourceLookUpTest, LiteParsedDocumentSourceLookupContainsExpectedNamespaces) {
auto stageSpec =
BSON("$lookup" << BSON("from"
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 3c84881323f..6ac5e76d21d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -69,12 +69,12 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
-using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
-using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
-using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
-using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
-using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement;
-using StreamType = DocumentSource::StageConstraints::StreamType;
+using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement;
+using HostTypeRequirement = StageConstraints::HostTypeRequirement;
+using PositionRequirement = StageConstraints::PositionRequirement;
+using DiskUseRequirement = StageConstraints::DiskUseRequirement;
+using FacetRequirement = StageConstraints::FacetRequirement;
+using StreamType = StageConstraints::StreamType;
constexpr MatchExpressionParser::AllowedFeatureSet Pipeline::kAllowedMatcherFeatures;
constexpr MatchExpressionParser::AllowedFeatureSet Pipeline::kGeoNearMatcherFeatures;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 280473848cc..64b0dcfe7c2 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -2187,7 +2187,7 @@ public:
}
};
-using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
+using HostTypeRequirement = StageConstraints::HostTypeRequirement;
using PipelineMustRunOnMongoSTest = AggregationContextFixture;
TEST_F(PipelineMustRunOnMongoSTest, UnsplittablePipelineMustRunOnMongoS) {
diff --git a/src/mongo/db/pipeline/stage_constraints.cpp b/src/mongo/db/pipeline/stage_constraints.cpp
new file mode 100644
index 00000000000..8f937e09089
--- /dev/null
+++ b/src/mongo/db/pipeline/stage_constraints.cpp
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/stage_constraints.h"
+
+namespace mongo {
+constexpr StageConstraints::DiskUseAndTransactionRequirement
+ StageConstraints::kDefaultDiskUseAndTransactionRequirement;
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h
new file mode 100644
index 00000000000..c55430ea181
--- /dev/null
+++ b/src/mongo/db/pipeline/stage_constraints.h
@@ -0,0 +1,289 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <numeric>
+
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+/**
+ * A struct describing various constraints about where this stage can run, where it must be in
+ * the pipeline, what resources it may require, etc.
+ */
+struct StageConstraints {
+ /**
+ * A StreamType defines whether this stage is streaming (can produce output based solely on
+ * the current input document) or blocking (must examine subsequent documents before
+ * producing an output document).
+ */
+ enum class StreamType { kStreaming, kBlocking };
+
+ /**
+ * A PositionRequirement stipulates what specific position the stage must occupy within the
+ * pipeline, if any.
+ */
+ enum class PositionRequirement { kNone, kFirst, kLast };
+
+ /**
+ * A HostTypeRequirement defines where this stage is permitted to be executed when the
+ * pipeline is run on a sharded cluster.
+ */
+ enum class HostTypeRequirement {
+ // Indicates that the stage can run either on mongoD or mongoS.
+ kNone,
+ // Indicates that the stage must run on the host to which it was originally sent and
+ // cannot be forwarded from mongoS to the shards.
+ kLocalOnly,
+ // Indicates that the stage must run on the primary shard.
+ kPrimaryShard,
+ // Indicates that the stage must run on any participating shard.
+ kAnyShard,
+ // Indicates that the stage can only run on mongoS.
+ kMongoS,
+ };
+
+ /**
+ * A DiskUseRequirement indicates whether this stage writes permanent data to disk, or
+ * whether it may spill temporary data to disk if its memory usage exceeds a given
+ * threshold. Note that this only indicates that the stage has the ability to spill; if
+ * 'allowDiskUse' is set to false, it will be prevented from doing so.
+ *
+ * This enum is purposefully ordered such that a "stronger" need to write data has a higher
+ * enum value.
+ */
+ enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
+
+ /**
+ * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream
+ * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is
+ * blacklisted from $changeStream.
+ */
+ enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist };
+
+ /**
+ * A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
+ */
+ enum class FacetRequirement { kAllowed, kNotAllowed };
+
+ /**
+ * Indicates whether or not this stage is legal when the read concern for the aggregate has
+ * readConcern level "snapshot" or is running inside of a multi-document transaction.
+ */
+ enum class TransactionRequirement { kNotAllowed, kAllowed };
+
+ using DiskUseAndTransactionRequirement = std::pair<DiskUseRequirement, TransactionRequirement>;
+
+ /**
+ * By default, a stage is assumed to use no disk and be allowed to run in a transaction.
+ */
+ static constexpr auto kDefaultDiskUseAndTransactionRequirement =
+ std::make_pair(DiskUseRequirement::kNoDiskUse, TransactionRequirement::kAllowed);
+
+ /**
+ * Given a 'pipeline' of DocumentSources, resolves the container's disk use requirement and
+ * transaction requirement:
+ *
+ * - Returns the "strictest" DiskUseRequirement reported by the stages in 'pipeline',
+ * where the strictness order is kNone < kWritesTmpData < kWritesPersistentData. For example,
+ * in a pipeline where all three DiskUseRequirements are present, the return value will be
+ * DiskUseRequirement::kWritesPersistentData.
+ *
+ * - Returns TransactionRequirement::kAllowed if and only if every DocumentSource in
+ * 'pipeline' is allowed in a transaction.
+ */
+ template <typename DocumentSourceContainer>
+ static DiskUseAndTransactionRequirement resolveDiskUseAndTransactionRequirement(
+ const DocumentSourceContainer& pipeline,
+ DiskUseAndTransactionRequirement defaultReqs = kDefaultDiskUseAndTransactionRequirement) {
+ return std::accumulate(
+ pipeline.begin(),
+ pipeline.end(),
+ defaultReqs,
+ [](const DiskUseAndTransactionRequirement& constraints, const auto& stage) {
+ const auto stageConstraints = stage->constraints();
+ const auto diskUse = std::max(constraints.first, stageConstraints.diskRequirement);
+ const auto txnReq =
+ std::min(constraints.second, stageConstraints.transactionRequirement);
+ return std::make_pair(diskUse, txnReq);
+ });
+ }
+
+ StageConstraints(
+ StreamType streamType,
+ PositionRequirement requiredPosition,
+ HostTypeRequirement hostRequirement,
+ DiskUseRequirement diskRequirement,
+ FacetRequirement facetRequirement,
+ TransactionRequirement transactionRequirement,
+ ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
+ : requiredPosition(requiredPosition),
+ hostRequirement(hostRequirement),
+ diskRequirement(diskRequirement),
+ changeStreamRequirement(changeStreamRequirement),
+ facetRequirement(facetRequirement),
+ transactionRequirement(transactionRequirement),
+ streamType(streamType) {
+ // Stages which are allowed to run in $facet must not have any position requirements.
+ invariant(!(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone));
+
+ // No change stream stages are permitted to run in a $facet pipeline.
+ invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage()));
+
+ // Only streaming stages are permitted in $changeStream pipelines.
+ invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking));
+
+ // A stage which is whitelisted for $changeStream cannot have a requirement to run on a
+ // shard, since it needs to be able to run on mongoS in a cluster.
+ invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
+ (hostRequirement == HostTypeRequirement::kAnyShard ||
+ hostRequirement == HostTypeRequirement::kPrimaryShard)));
+
+ // A stage which is whitelisted for $changeStream cannot have a position requirement.
+ invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
+ requiredPosition != PositionRequirement::kNone));
+
+ // Change stream stages should not be permitted with readConcern level "snapshot" or
+ // inside of a multi-document transaction.
+ if (isChangeStreamStage()) {
+ invariant(!isAllowedInTransaction());
+ }
+
+ // Stages which write data to user collections should not be permitted with readConcern
+ // level "snapshot" or inside of a multi-document transaction.
+ // TODO (SERVER-36259): relax this requirement when $out (which writes persistent data)
+ // is allowed in a transaction.
+ if (diskRequirement == DiskUseRequirement::kWritesPersistentData) {
+ invariant(!isAllowedInTransaction());
+ }
+ }
+
+ /**
+ * Returns the literal HostTypeRequirement used to initialize the StageConstraints, or the
+ * effective HostTypeRequirement (kAnyShard or kMongoS) if kLocalOnly was specified.
+ */
+ HostTypeRequirement resolvedHostTypeRequirement(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
+ return (hostRequirement != HostTypeRequirement::kLocalOnly
+ ? hostRequirement
+ : (expCtx->inMongos ? HostTypeRequirement::kMongoS
+ : HostTypeRequirement::kAnyShard));
+ }
+
+ /**
+ * True if this stage must run on the same host to which it was originally sent.
+ */
+ bool mustRunLocally() const {
+ return hostRequirement == HostTypeRequirement::kLocalOnly;
+ }
+
+ /**
+ * True if this stage is permitted to run in a $facet pipeline.
+ */
+ bool isAllowedInsideFacetStage() const {
+ return facetRequirement == FacetRequirement::kAllowed;
+ }
+
+ /**
+ * True if this stage is permitted to run in a pipeline which starts with $changeStream.
+ */
+ bool isAllowedInChangeStream() const {
+ return changeStreamRequirement != ChangeStreamRequirement::kBlacklist;
+ }
+
+ /**
+ * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed
+ * to run in a pipeline which begins with $changeStream.
+ */
+ bool isChangeStreamStage() const {
+ return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
+ }
+
+ /**
+ * Returns true if this stage is legal when the readConcern level is "snapshot" or when this
+ * aggregation is being run within a multi-document transaction.
+ */
+ bool isAllowedInTransaction() const {
+ return transactionRequirement == TransactionRequirement::kAllowed;
+ }
+
+ /**
+ * Returns true if this stage writes persistent data to disk.
+ */
+ bool writesPersistentData() const {
+ return diskRequirement == DiskUseRequirement::kWritesPersistentData;
+ }
+
+ // Indicates whether this stage needs to be at a particular position in the pipeline.
+ const PositionRequirement requiredPosition;
+
+ // Indicates whether this stage can only be executed on specific components of a sharded
+ // cluster.
+ const HostTypeRequirement hostRequirement;
+
+ // Indicates whether this stage may write persistent data to disk, or may spill to temporary
+ // files if its memory usage becomes excessive.
+ const DiskUseRequirement diskRequirement;
+
+ // Indicates whether this stage is itself a $changeStream stage, or if not whether it may
+ // exist in a pipeline which begins with $changeStream.
+ const ChangeStreamRequirement changeStreamRequirement;
+
+ // Indicates whether this stage may run inside a $facet stage.
+ const FacetRequirement facetRequirement;
+
+ // Indicates whether this stage is legal when the readConcern level is "snapshot" or the
+ // aggregate is running inside of a multi-document transaction.
+ const TransactionRequirement transactionRequirement;
+
+ // Indicates whether this is a streaming or blocking stage.
+ const StreamType streamType;
+
+ // True if this stage does not generate results itself, and instead pulls inputs from an
+ // input DocumentSource (via 'pSource').
+ bool requiresInputDocSource = true;
+
+ // True if this stage operates on a global or database level, like $currentOp.
+ bool isIndependentOfAnyCollection = false;
+
+ // True if this stage can ever be safely swapped with a subsequent $match stage, provided
+ // that the match does not depend on the paths returned by getModifiedPaths().
+ //
+ // Stages that want to participate in match swapping should set this to true. Such a stage
+ // must also override getModifiedPaths() to provide information about which particular
+ // $match predicates be swapped before itself.
+ bool canSwapWithMatch = false;
+
+ // True if a subsequent $limit stage can be moved before this stage in the pipeline. This is
+ // true if this stage does not add or remove documents from the pipeline.
+ bool canSwapWithLimit = false;
+};
+} // namespace mongo