diff options
author | Kyle Suarez <kyle.suarez@mongodb.com> | 2018-09-14 12:17:07 -0400 |
---|---|---|
committer | Kyle Suarez <kyle.suarez@mongodb.com> | 2018-09-14 13:51:15 -0400 |
commit | 7e8486ba84837a548f2f1470209eb204a42c293a (patch) | |
tree | a867539db74beba9368012efb0229d44f05d6f7b | |
parent | 8f9cf06033d7b1e0942c76eecfb69b5eee044ed6 (diff) | |
download | mongo-7e8486ba84837a548f2f1470209eb204a42c293a.tar.gz |
SERVER-35419 $lookup and $facet must inherit constraints from children
By default, $lookup and $facet do not write persistent data and are
allowed in a transaction. However, both stages must inherit the
"strictest" disk use requirement of any stage in their sub-pipelines,
and can only be used in a transaction if each of those pipelines contain
only transaction-compatible stages.
-rw-r--r-- | jstests/core/views/views_creation.js | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 212 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet_test.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 25 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_test.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stage_constraints.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stage_constraints.h | 289 |
15 files changed, 538 insertions, 258 deletions
diff --git a/jstests/core/views/views_creation.js b/jstests/core/views/views_creation.js index 68c896118a5..b320fd711ae 100644 --- a/jstests/core/views/views_creation.js +++ b/jstests/core/views/views_creation.js @@ -77,12 +77,24 @@ assert.commandFailedWithCode( viewsDB.runCommand({create: "dollar$", viewOn: "collection", pipeline: pipe}), ErrorCodes.InvalidNamespace); + + // You cannot create a view with a $out stage, by itself or nested inside of a different stage. + const outStage = {$out: "nonExistentCollection"}; + assert.commandFailedWithCode( + viewsDB.runCommand({create: "viewWithOut", viewOn: "collection", pipeline: [outStage]}), + ErrorCodes.OptionNotSupportedOnView); assert.commandFailedWithCode(viewsDB.runCommand({ - create: "viewWithBadPipeline", + create: "viewWithOutInLookup", viewOn: "collection", - pipeline: [{$project: {_id: false}}, {$out: "notExistingCollection"}] + pipeline: [{$lookup: {from: "other", pipeline: [outStage], as: "result"}}] }), ErrorCodes.OptionNotSupportedOnView); + assert.commandFailedWithCode(viewsDB.runCommand({ + create: "viewWithOutInFacet", + viewOn: "collection", + pipeline: [{$facet: {output: [outStage]}}] + }), + 40600); // These test that, when an existing view in system.views is invalid because of a $out in the // pipeline, the database errors on creation of a new view. diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 40d1afff528..06c490a90d1 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -354,6 +354,7 @@ pipelineeEnv.Library( 'document_source_unwind.cpp', 'pipeline.cpp', 'sequential_document_cache.cpp', + 'stage_constraints.cpp', 'tee_buffer.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index e9bf60dba55..ea11eea4cc3 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -50,6 +50,7 @@ #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/stage_constraints.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" #include "mongo/stdx/functional.h" @@ -130,217 +131,6 @@ public: using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; - /** - * A struct describing various constraints about where this stage can run, where it must be in - * the pipeline, what resources it may require, etc. - */ - struct StageConstraints { - /** - * A StreamType defines whether this stage is streaming (can produce output based solely on - * the current input document) or blocking (must examine subsequent documents before - * producing an output document). - */ - enum class StreamType { kStreaming, kBlocking }; - - /** - * A PositionRequirement stipulates what specific position the stage must occupy within the - * pipeline, if any. - */ - enum class PositionRequirement { kNone, kFirst, kLast }; - - /** - * A HostTypeRequirement defines where this stage is permitted to be executed when the - * pipeline is run on a sharded cluster. - */ - enum class HostTypeRequirement { - // Indicates that the stage can run either on mongoD or mongoS. - kNone, - // Indicates that the stage must run on the host to which it was originally sent and - // cannot be forwarded from mongoS to the shards. - kLocalOnly, - // Indicates that the stage must run on the primary shard. - kPrimaryShard, - // Indicates that the stage must run on any participating shard. - kAnyShard, - // Indicates that the stage can only run on mongoS. - kMongoS, - }; - - /** - * A DiskUseRequirement indicates whether this stage writes permanent data to disk, or - * whether it may spill temporary data to disk if its memory usage exceeds a given - * threshold. Note that this only indicates that the stage has the ability to spill; if - * 'allowDiskUse' is set to false, it will be prevented from doing so. - */ - enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; - - /** - * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream - * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is - * blacklisted from $changeStream. - */ - enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist }; - - /** - * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. - */ - enum class FacetRequirement { kAllowed, kNotAllowed }; - - /** - * Indicates whether or not this stage is legal when the read concern for the aggregate has - * readConcern level "snapshot" or is running inside of a multi-document transaction. - */ - enum class TransactionRequirement { kNotAllowed, kAllowed }; - - StageConstraints( - StreamType streamType, - PositionRequirement requiredPosition, - HostTypeRequirement hostRequirement, - DiskUseRequirement diskRequirement, - FacetRequirement facetRequirement, - TransactionRequirement transactionRequirement, - ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) - : requiredPosition(requiredPosition), - hostRequirement(hostRequirement), - diskRequirement(diskRequirement), - changeStreamRequirement(changeStreamRequirement), - facetRequirement(facetRequirement), - transactionRequirement(transactionRequirement), - streamType(streamType) { - // Stages which are allowed to run in $facet must not have any position requirements. - invariant( - !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); - - // No change stream stages are permitted to run in a $facet pipeline. - invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage())); - - // Only streaming stages are permitted in $changeStream pipelines. - invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking)); - - // A stage which is whitelisted for $changeStream cannot have a requirement to run on a - // shard, since it needs to be able to run on mongoS in a cluster. - invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && - (hostRequirement == HostTypeRequirement::kAnyShard || - hostRequirement == HostTypeRequirement::kPrimaryShard))); - - // A stage which is whitelisted for $changeStream cannot have a position requirement. - invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && - requiredPosition != PositionRequirement::kNone)); - - // Change stream stages should not be permitted with readConcern level "snapshot" or - // inside of a multi-document transaction. - if (isChangeStreamStage()) { - invariant(!isAllowedInTransaction()); - } - - // Stages which write data to user collections should not be permitted with readConcern - // level "snapshot" or inside of a multi-document transaction. - if (diskRequirement == DiskUseRequirement::kWritesPersistentData) { - invariant(!isAllowedInTransaction()); - } - } - - /** - * Returns the literal HostTypeRequirement used to initialize the StageConstraints, or the - * effective HostTypeRequirement (kAnyShard or kMongoS) if kLocalOnly was specified. - */ - HostTypeRequirement resolvedHostTypeRequirement( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const { - return (hostRequirement != HostTypeRequirement::kLocalOnly - ? hostRequirement - : (expCtx->inMongos ? HostTypeRequirement::kMongoS - : HostTypeRequirement::kAnyShard)); - } - - /** - * True if this stage must run on the same host to which it was originally sent. - */ - bool mustRunLocally() const { - return hostRequirement == HostTypeRequirement::kLocalOnly; - } - - /** - * True if this stage is permitted to run in a $facet pipeline. - */ - bool isAllowedInsideFacetStage() const { - return facetRequirement == FacetRequirement::kAllowed; - } - - /** - * True if this stage is permitted to run in a pipeline which starts with $changeStream. - */ - bool isAllowedInChangeStream() const { - return changeStreamRequirement != ChangeStreamRequirement::kBlacklist; - } - - /** - * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed - * to run in a pipeline which begins with $changeStream. - */ - bool isChangeStreamStage() const { - return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage; - } - - /** - * Returns true if this stage is legal when the readConcern level is "snapshot" or when this - * aggregation is being run within a multi-document transaction. - */ - bool isAllowedInTransaction() const { - return transactionRequirement == TransactionRequirement::kAllowed; - } - - /** - * Returns true if this stage writes persistent data to disk. - */ - bool writesPersistentData() const { - return diskRequirement == DiskUseRequirement::kWritesPersistentData; - } - - // Indicates whether this stage needs to be at a particular position in the pipeline. - const PositionRequirement requiredPosition; - - // Indicates whether this stage can only be executed on specific components of a sharded - // cluster. - const HostTypeRequirement hostRequirement; - - // Indicates whether this stage may write persistent data to disk, or may spill to temporary - // files if its memory usage becomes excessive. - const DiskUseRequirement diskRequirement; - - // Indicates whether this stage is itself a $changeStream stage, or if not whether it may - // exist in a pipeline which begins with $changeStream. - const ChangeStreamRequirement changeStreamRequirement; - - // Indicates whether this stage may run inside a $facet stage. - const FacetRequirement facetRequirement; - - // Indicates whether this stage is legal when the readConcern level is "snapshot" or the - // aggregate is running inside of a multi-document transaction. - const TransactionRequirement transactionRequirement; - - // Indicates whether this is a streaming or blocking stage. - const StreamType streamType; - - // True if this stage does not generate results itself, and instead pulls inputs from an - // input DocumentSource (via 'pSource'). - bool requiresInputDocSource = true; - - // True if this stage operates on a global or database level, like $currentOp. - bool isIndependentOfAnyCollection = false; - - // True if this stage can ever be safely swapped with a subsequent $match stage, provided - // that the match does not depend on the paths returned by getModifiedPaths(). - // - // Stages that want to participate in match swapping should set this to true. Such a stage - // must also override getModifiedPaths() to provide information about which particular - // $match predicates be swapped before itself. - bool canSwapWithMatch = false; - - // True if a subsequent $limit stage can be moved before this stage in the pipeline. This is - // true if this stage does not add or remove documents from the pipeline. - bool canSwapWithLimit = false; - }; - using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = StageConstraints::HostTypeRequirement; using PositionRequirement = StageConstraints::PositionRequirement; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 5ad06bd9329..2e452b70800 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -112,9 +112,7 @@ const char* DocumentSourceOplogMatch::getSourceName() const { return DocumentSourceChangeStream::kStageName.rawData(); } -DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints( - Pipeline::SplitState pipeState) const { - +StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipeState) const { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 9e49dedcd1f..cb86e61b65a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -117,7 +117,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( } } -DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints( +StageConstraints DocumentSourceChangeStreamTransform::constraints( Pipeline::SplitState pipeState) const { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index d0b2450fb91..c569b625d0b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -51,7 +51,7 @@ public: DocumentSource::GetModPathsReturn getModifiedPaths() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const; - DocumentSource::StageConstraints constraints(Pipeline::SplitState pipeState) const final; + StageConstraints constraints(Pipeline::SplitState pipeState) const final; DocumentSource::GetNextResult getNext(); const char* getSourceName() const { return DocumentSourceChangeStream::kStageName.rawData(); diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 4f2a2dd8547..55efe9538fd 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -234,15 +234,7 @@ void DocumentSourceFacet::reattachToOperationContext(OperationContext* opCtx) { } } -DocumentSource::StageConstraints DocumentSourceFacet::constraints( - Pipeline::SplitState pipeState) const { - const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) { - const auto sources = facet.pipeline->getSources(); - return std::any_of(sources.begin(), sources.end(), [&](const auto source) { - return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData; - }); - }); - +StageConstraints DocumentSourceFacet::constraints(Pipeline::SplitState) const { // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). // This means that if any stage in any of the $facet pipelines needs to run on the primary shard // or on mongoS, then the entire $facet stage must run there. @@ -266,12 +258,20 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints( } } + // Resolve the disk use and transaction requirement of this $facet by iterating through the + // children in its facets. + auto diskAndTxnReq = StageConstraints::kDefaultDiskUseAndTransactionRequirement; + for (const auto& facet : _facets) { + diskAndTxnReq = StageConstraints::resolveDiskUseAndTransactionRequirement( + facet.pipeline->getSources(), diskAndTxnReq); + } + return {StreamType::kBlocking, PositionRequirement::kNone, host, - mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse, + std::get<StageConstraints::DiskUseRequirement>(diskAndTxnReq), FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed}; + std::get<StageConstraints::TransactionRequirement>(diskAndTxnReq)}; } bool DocumentSourceFacet::usedDisk() { diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 9a367f14b24..6a7e52c338b 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -229,6 +229,55 @@ public: } }; +TEST_F(DocumentSourceFacetTest, PassthroughFacetDoesntRequireDiskAndIsOKInaTxn) { + auto ctx = getExpCtx(); + auto passthrough = DocumentSourcePassthrough::create(); + auto passthroughPipe = uassertStatusOK(Pipeline::createFacetPipeline({passthrough}, ctx)); + + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("passthrough", std::move(passthroughPipe)); + + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + DocumentSource::DiskUseRequirement::kNoDiskUse); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + DocumentSource::TransactionRequirement::kAllowed); +} + +/** + * A dummy DocumentSource which writes persistent data. + */ +class DocumentSourceWritesPersistentData final : public DocumentSourcePassthrough { +public: + StageConstraints constraints(Pipeline::SplitState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesPersistentData, + FacetRequirement::kAllowed, + TransactionRequirement::kNotAllowed}; + } + + static boost::intrusive_ptr<DocumentSourceWritesPersistentData> create() { + return new DocumentSourceWritesPersistentData(); + } +}; + +TEST_F(DocumentSourceFacetTest, FacetWithChildThatWritesDataAlsoReportsWritingData) { + auto ctx = getExpCtx(); + auto writesDataStage = DocumentSourceWritesPersistentData::create(); + auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({writesDataStage}, ctx)); + + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("writes", std::move(pipeline)); + + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + DocumentSource::DiskUseRequirement::kWritesPersistentData); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + DocumentSource::TransactionRequirement::kNotAllowed); +} + TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { auto ctx = getExpCtx(); @@ -697,7 +746,11 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement == - DocumentSource::StageConstraints::HostTypeRequirement::kPrimaryShard); + StageConstraints::HostTypeRequirement::kPrimaryShard); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + StageConstraints::DiskUseRequirement::kNoDiskUse); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + StageConstraints::TransactionRequirement::kAllowed); } TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { @@ -717,8 +770,55 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement == - DocumentSource::StageConstraints::HostTypeRequirement::kNone); + StageConstraints::HostTypeRequirement::kNone); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + StageConstraints::DiskUseRequirement::kNoDiskUse); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + StageConstraints::TransactionRequirement::kAllowed); } +/** + * A dummy DocumentSource that must run on the primary shard, can write temporary data and can't be + * used in a transaction. + */ +class DocumentSourcePrimaryShardTmpDataNoTxn final : public DocumentSourcePassthrough { +public: + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed, + TransactionRequirement::kNotAllowed}; + } + + static boost::intrusive_ptr<DocumentSourcePrimaryShardTmpDataNoTxn> create() { + return new DocumentSourcePrimaryShardTmpDataNoTxn(); + } +}; + +TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstraint) { + auto ctx = getExpCtx(); + + auto firstPassthrough = DocumentSourcePassthrough::create(); + auto firstPipeline = + unittest::assertGet(Pipeline::createFacetPipeline({firstPassthrough}, ctx)); + + auto secondPassthrough = DocumentSourcePrimaryShardTmpDataNoTxn::create(); + auto secondPipeline = + unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx)); + + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("first", std::move(firstPipeline)); + facets.emplace_back("second", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); + + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement == + StageConstraints::HostTypeRequirement::kPrimaryShard); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + StageConstraints::DiskUseRequirement::kWritesTmpData); + ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + StageConstraints::TransactionRequirement::kNotAllowed); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 16e20491f04..4762916ee3a 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -180,6 +180,31 @@ const char* DocumentSourceLookUp::getSourceName() const { return "$lookup"; } +StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { + // By default, $lookup is allowed in a transaction and does not use disk. + auto diskRequirement = DiskUseRequirement::kNoDiskUse; + auto txnRequirement = TransactionRequirement::kAllowed; + + // However, if $lookup is specified with a pipeline, it inherits the strictest disk use and + // transaction requirement from the children in its pipeline. + if (wasConstructedWithPipelineSyntax()) { + const auto resolvedRequirements = StageConstraints::resolveDiskUseAndTransactionRequirement( + _parsedIntrospectionPipeline->getSources()); + diskRequirement = resolvedRequirements.first; + txnRequirement = resolvedRequirements.second; + } + + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + diskRequirement, + FacetRequirement::kAllowed, + txnRequirement); + + constraints.canSwapWithMatch = true; + return constraints; +} + namespace { /** diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 5276aa917c4..de12c821443 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -103,26 +103,11 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - const bool mayUseDisk = wasConstructedWithPipelineSyntax() && - std::any_of(_parsedIntrospectionPipeline->getSources().begin(), - _parsedIntrospectionPipeline->getSources().end(), - [](const auto& source) { - return source->constraints().diskRequirement == - DiskUseRequirement::kWritesTmpData; - }); - - StageConstraints constraints(StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kPrimaryShard, - mayUseDisk ? DiskUseRequirement::kWritesTmpData - : DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed, - TransactionRequirement::kAllowed); - - constraints.canSwapWithMatch = true; - return constraints; - } + /** + * Reports the StageConstraints of this $lookup instance. A $lookup constructed with pipeline + * syntax will inherit certain constraints from the stages in its pipeline. + */ + StageConstraints constraints(Pipeline::SplitState) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 00c0baeecdb..f516626a47e 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -193,6 +193,50 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) { ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax()); } +TEST_F(DocumentSourceLookUpTest, LookupEmptyPipelineDoesntUseDiskAndIsOKInATransaction) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace_forTest(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = DocumentSourceLookUp::createFromBson( + BSON("$lookup" << BSON("from" << fromNs.coll().toString() << "pipeline" << BSONArray() + << "as" + << "as")) + .firstElement(), + expCtx); + auto lookup = static_cast<DocumentSourceLookUp*>(docSource.get()); + + ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax()); + ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + DocumentSource::DiskUseRequirement::kNoDiskUse); + ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + DocumentSource::TransactionRequirement::kAllowed); +} + +TEST_F(DocumentSourceLookUpTest, LookupWithChildStagesInheritsDiskUseRequirement) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespace_forTest(fromNs, {fromNs, std::vector<BSONObj>{}}); + + auto docSource = + DocumentSourceLookUp::createFromBson(BSON("$lookup" << BSON("from" + << "coll" + << "pipeline" + << BSON_ARRAY(BSON("$out" + << "target")) + << "as" + << "as")) + .firstElement(), + expCtx); + auto lookup = static_cast<DocumentSourceLookUp*>(docSource.get()); + + ASSERT_TRUE(lookup->wasConstructedWithPipelineSyntax()); + ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).diskRequirement == + DocumentSource::DiskUseRequirement::kWritesPersistentData); + ASSERT(lookup->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == + DocumentSource::TransactionRequirement::kNotAllowed); +} + TEST_F(DocumentSourceLookUpTest, LiteParsedDocumentSourceLookupContainsExpectedNamespaces) { auto stageSpec = BSON("$lookup" << BSON("from" diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 3c84881323f..6ac5e76d21d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -69,12 +69,12 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; -using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement; -using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; -using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; -using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement; -using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement; -using StreamType = DocumentSource::StageConstraints::StreamType; +using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement; +using HostTypeRequirement = StageConstraints::HostTypeRequirement; +using PositionRequirement = StageConstraints::PositionRequirement; +using DiskUseRequirement = StageConstraints::DiskUseRequirement; +using FacetRequirement = StageConstraints::FacetRequirement; +using StreamType = StageConstraints::StreamType; constexpr MatchExpressionParser::AllowedFeatureSet Pipeline::kAllowedMatcherFeatures; constexpr MatchExpressionParser::AllowedFeatureSet Pipeline::kGeoNearMatcherFeatures; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 280473848cc..64b0dcfe7c2 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2187,7 +2187,7 @@ public: } }; -using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; +using HostTypeRequirement = StageConstraints::HostTypeRequirement; using PipelineMustRunOnMongoSTest = AggregationContextFixture; TEST_F(PipelineMustRunOnMongoSTest, UnsplittablePipelineMustRunOnMongoS) { diff --git a/src/mongo/db/pipeline/stage_constraints.cpp b/src/mongo/db/pipeline/stage_constraints.cpp new file mode 100644 index 00000000000..8f937e09089 --- /dev/null +++ b/src/mongo/db/pipeline/stage_constraints.cpp @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/stage_constraints.h" + +namespace mongo { +constexpr StageConstraints::DiskUseAndTransactionRequirement + StageConstraints::kDefaultDiskUseAndTransactionRequirement; +} // namespace mongo diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h new file mode 100644 index 00000000000..c55430ea181 --- /dev/null +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -0,0 +1,289 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <numeric> + +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +/** + * A struct describing various constraints about where this stage can run, where it must be in + * the pipeline, what resources it may require, etc. + */ +struct StageConstraints { + /** + * A StreamType defines whether this stage is streaming (can produce output based solely on + * the current input document) or blocking (must examine subsequent documents before + * producing an output document). + */ + enum class StreamType { kStreaming, kBlocking }; + + /** + * A PositionRequirement stipulates what specific position the stage must occupy within the + * pipeline, if any. + */ + enum class PositionRequirement { kNone, kFirst, kLast }; + + /** + * A HostTypeRequirement defines where this stage is permitted to be executed when the + * pipeline is run on a sharded cluster. + */ + enum class HostTypeRequirement { + // Indicates that the stage can run either on mongoD or mongoS. + kNone, + // Indicates that the stage must run on the host to which it was originally sent and + // cannot be forwarded from mongoS to the shards. + kLocalOnly, + // Indicates that the stage must run on the primary shard. + kPrimaryShard, + // Indicates that the stage must run on any participating shard. + kAnyShard, + // Indicates that the stage can only run on mongoS. + kMongoS, + }; + + /** + * A DiskUseRequirement indicates whether this stage writes permanent data to disk, or + * whether it may spill temporary data to disk if its memory usage exceeds a given + * threshold. Note that this only indicates that the stage has the ability to spill; if + * 'allowDiskUse' is set to false, it will be prevented from doing so. + * + * This enum is purposefully ordered such that a "stronger" need to write data has a higher + * enum value. + */ + enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; + + /** + * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream + * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is + * blacklisted from $changeStream. + */ + enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist }; + + /** + * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. + */ + enum class FacetRequirement { kAllowed, kNotAllowed }; + + /** + * Indicates whether or not this stage is legal when the read concern for the aggregate has + * readConcern level "snapshot" or is running inside of a multi-document transaction. + */ + enum class TransactionRequirement { kNotAllowed, kAllowed }; + + using DiskUseAndTransactionRequirement = std::pair<DiskUseRequirement, TransactionRequirement>; + + /** + * By default, a stage is assumed to use no disk and be allowed to run in a transaction. + */ + static constexpr auto kDefaultDiskUseAndTransactionRequirement = + std::make_pair(DiskUseRequirement::kNoDiskUse, TransactionRequirement::kAllowed); + + /** + * Given a 'pipeline' of DocumentSources, resolves the container's disk use requirement and + * transaction requirement: + * + * - Returns the "strictest" DiskUseRequirement reported by the stages in 'pipeline', + * where the strictness order is kNone < kWritesTmpData < kWritesPersistentData. For example, + * in a pipeline where all three DiskUseRequirements are present, the return value will be + * DiskUseRequirement::kWritesPersistentData. + * + * - Returns TransactionRequirement::kAllowed if and only if every DocumentSource in + * 'pipeline' is allowed in a transaction. + */ + template <typename DocumentSourceContainer> + static DiskUseAndTransactionRequirement resolveDiskUseAndTransactionRequirement( + const DocumentSourceContainer& pipeline, + DiskUseAndTransactionRequirement defaultReqs = kDefaultDiskUseAndTransactionRequirement) { + return std::accumulate( + pipeline.begin(), + pipeline.end(), + defaultReqs, + [](const DiskUseAndTransactionRequirement& constraints, const auto& stage) { + const auto stageConstraints = stage->constraints(); + const auto diskUse = std::max(constraints.first, stageConstraints.diskRequirement); + const auto txnReq = + std::min(constraints.second, stageConstraints.transactionRequirement); + return std::make_pair(diskUse, txnReq); + }); + } + + StageConstraints( + StreamType streamType, + PositionRequirement requiredPosition, + HostTypeRequirement hostRequirement, + DiskUseRequirement diskRequirement, + FacetRequirement facetRequirement, + TransactionRequirement transactionRequirement, + ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) + : requiredPosition(requiredPosition), + hostRequirement(hostRequirement), + diskRequirement(diskRequirement), + changeStreamRequirement(changeStreamRequirement), + facetRequirement(facetRequirement), + transactionRequirement(transactionRequirement), + streamType(streamType) { + // Stages which are allowed to run in $facet must not have any position requirements. + invariant(!(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); + + // No change stream stages are permitted to run in a $facet pipeline. + invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage())); + + // Only streaming stages are permitted in $changeStream pipelines. + invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking)); + + // A stage which is whitelisted for $changeStream cannot have a requirement to run on a + // shard, since it needs to be able to run on mongoS in a cluster. + invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && + (hostRequirement == HostTypeRequirement::kAnyShard || + hostRequirement == HostTypeRequirement::kPrimaryShard))); + + // A stage which is whitelisted for $changeStream cannot have a position requirement. + invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && + requiredPosition != PositionRequirement::kNone)); + + // Change stream stages should not be permitted with readConcern level "snapshot" or + // inside of a multi-document transaction. + if (isChangeStreamStage()) { + invariant(!isAllowedInTransaction()); + } + + // Stages which write data to user collections should not be permitted with readConcern + // level "snapshot" or inside of a multi-document transaction. + // TODO (SERVER-36259): relax this requirement when $out (which writes persistent data) + // is allowed in a transaction. + if (diskRequirement == DiskUseRequirement::kWritesPersistentData) { + invariant(!isAllowedInTransaction()); + } + } + + /** + * Returns the literal HostTypeRequirement used to initialize the StageConstraints, or the + * effective HostTypeRequirement (kAnyShard or kMongoS) if kLocalOnly was specified. + */ + HostTypeRequirement resolvedHostTypeRequirement( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const { + return (hostRequirement != HostTypeRequirement::kLocalOnly + ? hostRequirement + : (expCtx->inMongos ? HostTypeRequirement::kMongoS + : HostTypeRequirement::kAnyShard)); + } + + /** + * True if this stage must run on the same host to which it was originally sent. + */ + bool mustRunLocally() const { + return hostRequirement == HostTypeRequirement::kLocalOnly; + } + + /** + * True if this stage is permitted to run in a $facet pipeline. + */ + bool isAllowedInsideFacetStage() const { + return facetRequirement == FacetRequirement::kAllowed; + } + + /** + * True if this stage is permitted to run in a pipeline which starts with $changeStream. + */ + bool isAllowedInChangeStream() const { + return changeStreamRequirement != ChangeStreamRequirement::kBlacklist; + } + + /** + * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed + * to run in a pipeline which begins with $changeStream. + */ + bool isChangeStreamStage() const { + return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage; + } + + /** + * Returns true if this stage is legal when the readConcern level is "snapshot" or when this + * aggregation is being run within a multi-document transaction. + */ + bool isAllowedInTransaction() const { + return transactionRequirement == TransactionRequirement::kAllowed; + } + + /** + * Returns true if this stage writes persistent data to disk. + */ + bool writesPersistentData() const { + return diskRequirement == DiskUseRequirement::kWritesPersistentData; + } + + // Indicates whether this stage needs to be at a particular position in the pipeline. + const PositionRequirement requiredPosition; + + // Indicates whether this stage can only be executed on specific components of a sharded + // cluster. + const HostTypeRequirement hostRequirement; + + // Indicates whether this stage may write persistent data to disk, or may spill to temporary + // files if its memory usage becomes excessive. + const DiskUseRequirement diskRequirement; + + // Indicates whether this stage is itself a $changeStream stage, or if not whether it may + // exist in a pipeline which begins with $changeStream. + const ChangeStreamRequirement changeStreamRequirement; + + // Indicates whether this stage may run inside a $facet stage. + const FacetRequirement facetRequirement; + + // Indicates whether this stage is legal when the readConcern level is "snapshot" or the + // aggregate is running inside of a multi-document transaction. + const TransactionRequirement transactionRequirement; + + // Indicates whether this is a streaming or blocking stage. + const StreamType streamType; + + // True if this stage does not generate results itself, and instead pulls inputs from an + // input DocumentSource (via 'pSource'). + bool requiresInputDocSource = true; + + // True if this stage operates on a global or database level, like $currentOp. + bool isIndependentOfAnyCollection = false; + + // True if this stage can ever be safely swapped with a subsequent $match stage, provided + // that the match does not depend on the paths returned by getModifiedPaths(). + // + // Stages that want to participate in match swapping should set this to true. Such a stage + // must also override getModifiedPaths() to provide information about which particular + // $match predicates be swapped before itself. + bool canSwapWithMatch = false; + + // True if a subsequent $limit stage can be moved before this stage in the pipeline. This is + // true if this stage does not add or remove documents from the pipeline. + bool canSwapWithLimit = false; +}; +} // namespace mongo |