diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-04 18:26:30 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-10 10:36:25 +0100 |
commit | f74896d9aae90d0b31eeb06f37f59ca386c03570 (patch) | |
tree | a05779131bc9fd608d2e6c6c0608251ea7a21d81 /src/mongo/db | |
parent | 6ad292704df43031ee94f696e709bf6285376ed4 (diff) | |
download | mongo-f74896d9aae90d0b31eeb06f37f59ca386c03570.tar.gz |
SERVER-29137 Implement $changeStream whitelist
Diffstat (limited to 'src/mongo/db')
9 files changed, 101 insertions, 28 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 3e3fd98a1df..bc9dce714ad 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -174,24 +174,49 @@ public: enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; /** + * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream + * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is + * blacklisted from $changeStream. + */ + enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist }; + + /** * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. */ enum class FacetRequirement { kAllowed, kNotAllowed }; - StageConstraints(StreamType streamType, - PositionRequirement requiredPosition, - HostTypeRequirement hostRequirement, - DiskUseRequirement diskRequirement, - FacetRequirement facetRequirement) + StageConstraints( + StreamType streamType, + PositionRequirement requiredPosition, + HostTypeRequirement hostRequirement, + DiskUseRequirement diskRequirement, + FacetRequirement facetRequirement, + ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) : requiredPosition(requiredPosition), hostRequirement(hostRequirement), diskRequirement(diskRequirement), + changeStreamRequirement(changeStreamRequirement), facetRequirement(facetRequirement), streamType(streamType) { - // Stages which are allowed to run in $facet pipelines must not have any specific - // position requirements. - invariant(!isAllowedInsideFacetStage() || - requiredPosition == PositionRequirement::kNone); + // Stages which are allowed to run in $facet must not have any position requirements. + invariant( + !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); + + // No change stream stages are permitted to run in a $facet pipeline. + invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage())); + + // Only streaming stages are permitted in $changeStream pipelines. + invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking)); + + // A stage which is whitelisted for $changeStream cannot have a requirement to run on a + // shard, since it needs to be able to run on mongoS in a cluster. + invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && + (hostRequirement == HostTypeRequirement::kAnyShard || + hostRequirement == HostTypeRequirement::kPrimaryShard))); + + // A stage which is whitelisted for $changeStream cannot have a position requirement. + invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && + requiredPosition != PositionRequirement::kNone)); } /** @@ -214,12 +239,27 @@ public: } /** - * True if this stage should be permitted to run in a $facet pipeline. + * True if this stage is permitted to run in a $facet pipeline. */ bool isAllowedInsideFacetStage() const { return facetRequirement == FacetRequirement::kAllowed; } + /** + * True if this stage is permitted to run in a pipeline which starts with $changeStream. + */ + bool isAllowedInChangeStream() const { + return changeStreamRequirement != ChangeStreamRequirement::kBlacklist; + } + + /** + * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed + * to run in a pipeline which begins with $changeStream. + */ + bool isChangeStreamStage() const { + return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage; + } + // Indicates whether this stage needs to be at a particular position in the pipeline. const PositionRequirement requiredPosition; @@ -231,6 +271,10 @@ public: // files if its memory usage becomes excessive. const DiskUseRequirement diskRequirement; + // Indicates whether this stage is itself a $changeStream stage, or if not whether it may + // exist in a pipeline which begins with $changeStream. + const ChangeStreamRequirement changeStreamRequirement; + // Indicates whether this stage may run inside a $facet stage. const FacetRequirement facetRequirement; @@ -253,6 +297,7 @@ public: bool canSwapWithMatch = false; }; + using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = StageConstraints::HostTypeRequirement; using PositionRequirement = StageConstraints::PositionRequirement; using DiskUseRequirement = StageConstraints::DiskUseRequirement; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index c8641d5c943..64d01baf450 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -106,7 +106,8 @@ DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints( PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } /** @@ -156,7 +157,8 @@ public: (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 8f5877887cb..956135f9d7a 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -64,7 +64,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; @@ -101,7 +102,8 @@ public: (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } /** diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 75518fa0dec..18d38b52fdb 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -66,7 +66,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index d6705fc223b..eabc4de92ae 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -55,7 +55,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + ChangeStreamRequirement::kWhitelist}; } Value serialize( diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index 0e13e9e48d3..c6c83794aec 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -45,7 +45,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + ChangeStreamRequirement::kWhitelist}; } /** diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 557ec06c9f2..70403cdf3c2 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -102,11 +102,18 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { - StageConstraints constraints(StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); + StageConstraints constraints( + StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation + ? FacetRequirement::kNotAllowed + : FacetRequirement::kAllowed), + (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation + ? ChangeStreamRequirement::kChangeStreamStage + : ChangeStreamRequirement::kWhitelist)); + constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index e8cb7f25f94..1b6f69d823e 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -68,7 +68,9 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed); + _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed, + _mergingPresorted ? ChangeStreamRequirement::kWhitelist + : ChangeStreamRequirement::kBlacklist); // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. constraints.canSwapWithMatch = !limitSrc; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 4fedca608ea..2053f6ac5b5 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -62,6 +62,7 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; +using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement; @@ -144,22 +145,33 @@ void Pipeline::validatePipeline() const { } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) { // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, // {aggregate: 1} is only valid for collectionless sources, and vice-versa. - const auto firstStage = _sources.front().get(); + const auto firstStageConstraints = _sources.front()->constraints(_splitState); if (nss.isCollectionlessAggregateNS() && - !firstStage->constraints(_splitState).isIndependentOfAnyCollection) { + !firstStageConstraints.isIndependentOfAnyCollection) { uasserted(ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" - << firstStage->getSourceName() + << _sources.front()->getSourceName() << "'; a collection is required."); } if (!nss.isCollectionlessAggregateNS() && - firstStage->constraints(_splitState).isIndependentOfAnyCollection) { + firstStageConstraints.isIndependentOfAnyCollection) { uasserted(ErrorCodes::InvalidNamespace, - str::stream() << "'" << firstStage->getSourceName() + str::stream() << "'" << _sources.front()->getSourceName() << "' can only be run with {aggregate: 1}"); } + + // If the first stage is a $changeStream stage, then all stages in the pipeline must be + // either $changeStream stages or whitelisted as being able to run in a change stream. + if (firstStageConstraints.isChangeStreamStage()) { + for (auto&& source : _sources) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << source->getSourceName() + << " is not permitted in a $changeStream pipeline", + source->constraints(_splitState).isAllowedInChangeStream()); + } + } } // Verify that each stage is in a legal position within the pipeline. |