diff options
-rw-r--r-- | jstests/change_streams/change_stream_whitelist.js | 61 | ||||
-rw-r--r-- | jstests/sharding/change_streams.js | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 65 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_match.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_redact.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_single_document_transformation.h | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 22 |
11 files changed, 170 insertions, 33 deletions
diff --git a/jstests/change_streams/change_stream_whitelist.js b/jstests/change_streams/change_stream_whitelist.js new file mode 100644 index 00000000000..817dd35fef4 --- /dev/null +++ b/jstests/change_streams/change_stream_whitelist.js @@ -0,0 +1,61 @@ +/** + * Tests that only whitelisted stages are permitted to run in a $changeStream pipeline. + */ + +(function() { + "use strict"; + + load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). + + // Bare-bones $changeStream pipeline which will be augmented during tests. + const changeStream = [{$changeStream: {}}]; + const coll = db[jsTestName()]; + + // List of non-$changeStream stages which are explicitly whitelisted. + const whitelist = [ + {$match: {_id: {$exists: true}}}, + {$project: {_id: 1}}, + {$addFields: {newField: 1}}, + {$replaceRoot: {newRoot: {_id: "$_id"}}}, + {$redact: "$$DESCEND"} + ]; + + // List of stages which the whitelist mechanism will prevent from running in a $changeStream. + // Does not include stages which are blacklisted but already implicitly prohibited, e.g. both + // $currentOp and $changeStream must be the first stage in a pipeline. + const blacklist = [ + {$group: {_id: "$_id"}}, + {$sort: {_id: 1}}, + {$skip: 100}, + {$limit: 100}, + {$sample: {size: 100}}, + {$unwind: "$_id"}, + {$lookup: {from: "coll", as: "as", localField: "_id", foreignField: "_id"}}, + { + $graphLookup: { + from: "coll", + as: "as", + startWith: "$_id", + connectFromField: "_id", + connectToField: "_id" + } + }, + {$bucketAuto: {groupBy: "$_id", buckets: 2}}, + {$facet: {facetPipe: [{$match: {_id: {$exists: true}}}]}} + ]; + + // Verify that each of the whitelisted stages are permitted to run in a $changeStream. + for (let allowedStage of whitelist) { + assert.commandWorked(db.runCommand( + {aggregate: coll.getName(), pipeline: changeStream.concat(allowedStage), cursor: {}})); + } + + // Verify that all of the whitelisted stages are able to run in a $changeStream together. + assert.commandWorked(db.runCommand( + {aggregate: coll.getName(), pipeline: changeStream.concat(whitelist), cursor: {}})); + + // Verify that a $changeStream pipeline fails to validate if a blacklisted stage is present. + for (let bannedStage of blacklist) { + assertErrorCode(coll, changeStream.concat(bannedStage), ErrorCodes.IllegalOperation); + } +}());
\ No newline at end of file diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index 200190000da..a2a1a83664a 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -102,14 +102,17 @@ mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); tempCursor.close(); - // TODO SERVER-29137: $sort and $group should be banned. - tempCursor = assert.doesNotThrow( - () => mongosColl.aggregate( - [{$changeStream: {}}, {$sort: {operationType: 1}}, {$group: {_id: "$documentKey"}}])); - tempCursor.close(); assert.commandWorked( mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); + // Test that $sort and $group are banned from running in a $changeStream pipeline. + assertErrorCode(mongosColl, + [{$changeStream: {}}, {$sort: {operationType: 1}}], + ErrorCodes.IllegalOperation); + assertErrorCode(mongosColl, + [{$changeStream: {}}, {$group: {_id: "$documentKey"}}], + ErrorCodes.IllegalOperation); + assert.writeOK(mongosColl.remove({})); // We awaited the replication of the first write, so the change stream shouldn't return it. // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 3e3fd98a1df..bc9dce714ad 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -174,24 +174,49 @@ public: enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; /** + * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream + * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is + * blacklisted from $changeStream. + */ + enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist }; + + /** * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. */ enum class FacetRequirement { kAllowed, kNotAllowed }; - StageConstraints(StreamType streamType, - PositionRequirement requiredPosition, - HostTypeRequirement hostRequirement, - DiskUseRequirement diskRequirement, - FacetRequirement facetRequirement) + StageConstraints( + StreamType streamType, + PositionRequirement requiredPosition, + HostTypeRequirement hostRequirement, + DiskUseRequirement diskRequirement, + FacetRequirement facetRequirement, + ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) : requiredPosition(requiredPosition), hostRequirement(hostRequirement), diskRequirement(diskRequirement), + changeStreamRequirement(changeStreamRequirement), facetRequirement(facetRequirement), streamType(streamType) { - // Stages which are allowed to run in $facet pipelines must not have any specific - // position requirements. - invariant(!isAllowedInsideFacetStage() || - requiredPosition == PositionRequirement::kNone); + // Stages which are allowed to run in $facet must not have any position requirements. + invariant( + !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); + + // No change stream stages are permitted to run in a $facet pipeline. + invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage())); + + // Only streaming stages are permitted in $changeStream pipelines. + invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking)); + + // A stage which is whitelisted for $changeStream cannot have a requirement to run on a + // shard, since it needs to be able to run on mongoS in a cluster. + invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && + (hostRequirement == HostTypeRequirement::kAnyShard || + hostRequirement == HostTypeRequirement::kPrimaryShard))); + + // A stage which is whitelisted for $changeStream cannot have a position requirement. + invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && + requiredPosition != PositionRequirement::kNone)); } /** @@ -214,12 +239,27 @@ public: } /** - * True if this stage should be permitted to run in a $facet pipeline. + * True if this stage is permitted to run in a $facet pipeline. */ bool isAllowedInsideFacetStage() const { return facetRequirement == FacetRequirement::kAllowed; } + /** + * True if this stage is permitted to run in a pipeline which starts with $changeStream. + */ + bool isAllowedInChangeStream() const { + return changeStreamRequirement != ChangeStreamRequirement::kBlacklist; + } + + /** + * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed + * to run in a pipeline which begins with $changeStream. + */ + bool isChangeStreamStage() const { + return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage; + } + // Indicates whether this stage needs to be at a particular position in the pipeline. const PositionRequirement requiredPosition; @@ -231,6 +271,10 @@ public: // files if its memory usage becomes excessive. const DiskUseRequirement diskRequirement; + // Indicates whether this stage is itself a $changeStream stage, or if not whether it may + // exist in a pipeline which begins with $changeStream. + const ChangeStreamRequirement changeStreamRequirement; + // Indicates whether this stage may run inside a $facet stage. const FacetRequirement facetRequirement; @@ -253,6 +297,7 @@ public: bool canSwapWithMatch = false; }; + using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = StageConstraints::HostTypeRequirement; using PositionRequirement = StageConstraints::PositionRequirement; using DiskUseRequirement = StageConstraints::DiskUseRequirement; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index c8641d5c943..64d01baf450 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -106,7 +106,8 @@ DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints( PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } /** @@ -156,7 +157,8 @@ public: (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 8f5877887cb..956135f9d7a 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -64,7 +64,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; @@ -101,7 +102,8 @@ public: (pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; } /** diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 75518fa0dec..18d38b52fdb 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -66,7 +66,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index d6705fc223b..eabc4de92ae 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -55,7 +55,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + ChangeStreamRequirement::kWhitelist}; } Value serialize( diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index 0e13e9e48d3..c6c83794aec 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -45,7 +45,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + ChangeStreamRequirement::kWhitelist}; } /** diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 557ec06c9f2..70403cdf3c2 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -102,11 +102,18 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { - StageConstraints constraints(StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); + StageConstraints constraints( + StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation + ? FacetRequirement::kNotAllowed + : FacetRequirement::kAllowed), + (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation + ? ChangeStreamRequirement::kChangeStreamStage + : ChangeStreamRequirement::kWhitelist)); + constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index e8cb7f25f94..1b6f69d823e 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -68,7 +68,9 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed); + _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed, + _mergingPresorted ? ChangeStreamRequirement::kWhitelist + : ChangeStreamRequirement::kBlacklist); // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. constraints.canSwapWithMatch = !limitSrc; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 4fedca608ea..2053f6ac5b5 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -62,6 +62,7 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; +using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement; @@ -144,22 +145,33 @@ void Pipeline::validatePipeline() const { } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) { // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, // {aggregate: 1} is only valid for collectionless sources, and vice-versa. - const auto firstStage = _sources.front().get(); + const auto firstStageConstraints = _sources.front()->constraints(_splitState); if (nss.isCollectionlessAggregateNS() && - !firstStage->constraints(_splitState).isIndependentOfAnyCollection) { + !firstStageConstraints.isIndependentOfAnyCollection) { uasserted(ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" - << firstStage->getSourceName() + << _sources.front()->getSourceName() << "'; a collection is required."); } if (!nss.isCollectionlessAggregateNS() && - firstStage->constraints(_splitState).isIndependentOfAnyCollection) { + firstStageConstraints.isIndependentOfAnyCollection) { uasserted(ErrorCodes::InvalidNamespace, - str::stream() << "'" << firstStage->getSourceName() + str::stream() << "'" << _sources.front()->getSourceName() << "' can only be run with {aggregate: 1}"); } + + // If the first stage is a $changeStream stage, then all stages in the pipeline must be + // either $changeStream stages or whitelisted as being able to run in a change stream. + if (firstStageConstraints.isChangeStreamStage()) { + for (auto&& source : _sources) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << source->getSourceName() + << " is not permitted in a $changeStream pipeline", + source->constraints(_splitState).isAllowedInChangeStream()); + } + } } // Verify that each stage is in a legal position within the pipeline. |