summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/change_stream_whitelist.js61
-rw-r--r--jstests/sharding/change_streams.js13
-rw-r--r--src/mongo/db/pipeline/document_source.h65
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h3
-rw-r--r--src/mongo/db/pipeline/document_source_match.h3
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h3
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h17
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp22
11 files changed, 170 insertions, 33 deletions
diff --git a/jstests/change_streams/change_stream_whitelist.js b/jstests/change_streams/change_stream_whitelist.js
new file mode 100644
index 00000000000..817dd35fef4
--- /dev/null
+++ b/jstests/change_streams/change_stream_whitelist.js
@@ -0,0 +1,61 @@
+/**
+ * Tests that only whitelisted stages are permitted to run in a $changeStream pipeline.
+ */
+
+(function() {
+ "use strict";
+
+ load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
+
+ // Bare-bones $changeStream pipeline which will be augmented during tests.
+ const changeStream = [{$changeStream: {}}];
+ const coll = db[jsTestName()];
+
+ // List of non-$changeStream stages which are explicitly whitelisted.
+ const whitelist = [
+ {$match: {_id: {$exists: true}}},
+ {$project: {_id: 1}},
+ {$addFields: {newField: 1}},
+ {$replaceRoot: {newRoot: {_id: "$_id"}}},
+ {$redact: "$$DESCEND"}
+ ];
+
+ // List of stages which the whitelist mechanism will prevent from running in a $changeStream.
+ // Does not include stages which are blacklisted but already implicitly prohibited, e.g. both
+ // $currentOp and $changeStream must be the first stage in a pipeline.
+ const blacklist = [
+ {$group: {_id: "$_id"}},
+ {$sort: {_id: 1}},
+ {$skip: 100},
+ {$limit: 100},
+ {$sample: {size: 100}},
+ {$unwind: "$_id"},
+ {$lookup: {from: "coll", as: "as", localField: "_id", foreignField: "_id"}},
+ {
+ $graphLookup: {
+ from: "coll",
+ as: "as",
+ startWith: "$_id",
+ connectFromField: "_id",
+ connectToField: "_id"
+ }
+ },
+ {$bucketAuto: {groupBy: "$_id", buckets: 2}},
+ {$facet: {facetPipe: [{$match: {_id: {$exists: true}}}]}}
+ ];
+
+ // Verify that each of the whitelisted stages are permitted to run in a $changeStream.
+ for (let allowedStage of whitelist) {
+ assert.commandWorked(db.runCommand(
+ {aggregate: coll.getName(), pipeline: changeStream.concat(allowedStage), cursor: {}}));
+ }
+
+ // Verify that all of the whitelisted stages are able to run in a $changeStream together.
+ assert.commandWorked(db.runCommand(
+ {aggregate: coll.getName(), pipeline: changeStream.concat(whitelist), cursor: {}}));
+
+ // Verify that a $changeStream pipeline fails to validate if a blacklisted stage is present.
+ for (let bannedStage of blacklist) {
+ assertErrorCode(coll, changeStream.concat(bannedStage), ErrorCodes.IllegalOperation);
+ }
+}()); \ No newline at end of file
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index 200190000da..a2a1a83664a 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -102,14 +102,17 @@
mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
tempCursor.close();
- // TODO SERVER-29137: $sort and $group should be banned.
- tempCursor = assert.doesNotThrow(
- () => mongosColl.aggregate(
- [{$changeStream: {}}, {$sort: {operationType: 1}}, {$group: {_id: "$documentKey"}}]));
- tempCursor.close();
assert.commandWorked(
mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
+ // Test that $sort and $group are banned from running in a $changeStream pipeline.
+ assertErrorCode(mongosColl,
+ [{$changeStream: {}}, {$sort: {operationType: 1}}],
+ ErrorCodes.IllegalOperation);
+ assertErrorCode(mongosColl,
+ [{$changeStream: {}}, {$group: {_id: "$documentKey"}}],
+ ErrorCodes.IllegalOperation);
+
assert.writeOK(mongosColl.remove({}));
// We awaited the replication of the first write, so the change stream shouldn't return it.
// Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 3e3fd98a1df..bc9dce714ad 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -174,24 +174,49 @@ public:
enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
/**
+ * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream
+ * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is
+ * blacklisted from $changeStream.
+ */
+ enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist };
+
+ /**
* A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
*/
enum class FacetRequirement { kAllowed, kNotAllowed };
- StageConstraints(StreamType streamType,
- PositionRequirement requiredPosition,
- HostTypeRequirement hostRequirement,
- DiskUseRequirement diskRequirement,
- FacetRequirement facetRequirement)
+ StageConstraints(
+ StreamType streamType,
+ PositionRequirement requiredPosition,
+ HostTypeRequirement hostRequirement,
+ DiskUseRequirement diskRequirement,
+ FacetRequirement facetRequirement,
+ ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
: requiredPosition(requiredPosition),
hostRequirement(hostRequirement),
diskRequirement(diskRequirement),
+ changeStreamRequirement(changeStreamRequirement),
facetRequirement(facetRequirement),
streamType(streamType) {
- // Stages which are allowed to run in $facet pipelines must not have any specific
- // position requirements.
- invariant(!isAllowedInsideFacetStage() ||
- requiredPosition == PositionRequirement::kNone);
+ // Stages which are allowed to run in $facet must not have any position requirements.
+ invariant(
+ !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone));
+
+ // No change stream stages are permitted to run in a $facet pipeline.
+ invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage()));
+
+ // Only streaming stages are permitted in $changeStream pipelines.
+ invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking));
+
+ // A stage which is whitelisted for $changeStream cannot have a requirement to run on a
+ // shard, since it needs to be able to run on mongoS in a cluster.
+ invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
+ (hostRequirement == HostTypeRequirement::kAnyShard ||
+ hostRequirement == HostTypeRequirement::kPrimaryShard)));
+
+ // A stage which is whitelisted for $changeStream cannot have a position requirement.
+ invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
+ requiredPosition != PositionRequirement::kNone));
}
/**
@@ -214,12 +239,27 @@ public:
}
/**
- * True if this stage should be permitted to run in a $facet pipeline.
+ * True if this stage is permitted to run in a $facet pipeline.
*/
bool isAllowedInsideFacetStage() const {
return facetRequirement == FacetRequirement::kAllowed;
}
+ /**
+ * True if this stage is permitted to run in a pipeline which starts with $changeStream.
+ */
+ bool isAllowedInChangeStream() const {
+ return changeStreamRequirement != ChangeStreamRequirement::kBlacklist;
+ }
+
+ /**
+ * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed
+ * to run in a pipeline which begins with $changeStream.
+ */
+ bool isChangeStreamStage() const {
+ return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
+ }
+
// Indicates whether this stage needs to be at a particular position in the pipeline.
const PositionRequirement requiredPosition;
@@ -231,6 +271,10 @@ public:
// files if its memory usage becomes excessive.
const DiskUseRequirement diskRequirement;
+ // Indicates whether this stage is itself a $changeStream stage, or if not whether it may
+ // exist in a pipeline which begins with $changeStream.
+ const ChangeStreamRequirement changeStreamRequirement;
+
// Indicates whether this stage may run inside a $facet stage.
const FacetRequirement facetRequirement;
@@ -253,6 +297,7 @@ public:
bool canSwapWithMatch = false;
};
+ using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
using PositionRequirement = StageConstraints::PositionRequirement;
using DiskUseRequirement = StageConstraints::DiskUseRequirement;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index c8641d5c943..64d01baf450 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -106,7 +106,8 @@ DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints(
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
/**
@@ -156,7 +157,8 @@ public:
(pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone
: HostTypeRequirement::kMongoS),
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 8f5877887cb..956135f9d7a 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -64,7 +64,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
@@ -101,7 +102,8 @@ public:
(pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone
: HostTypeRequirement::kMongoS),
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 75518fa0dec..18d38b52fdb 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -66,7 +66,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage);
constraints.canSwapWithMatch = true;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index d6705fc223b..eabc4de92ae 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -55,7 +55,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ ChangeStreamRequirement::kWhitelist};
}
Value serialize(
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index 0e13e9e48d3..c6c83794aec 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -45,7 +45,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ ChangeStreamRequirement::kWhitelist};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 557ec06c9f2..70403cdf3c2 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -102,11 +102,18 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
+ StageConstraints constraints(
+ StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
+ ? FacetRequirement::kNotAllowed
+ : FacetRequirement::kAllowed),
+ (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
+ ? ChangeStreamRequirement::kChangeStreamStage
+ : ChangeStreamRequirement::kWhitelist));
+
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index e8cb7f25f94..1b6f69d823e 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -68,7 +68,9 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
_mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed);
+ _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed,
+ _mergingPresorted ? ChangeStreamRequirement::kWhitelist
+ : ChangeStreamRequirement::kBlacklist);
// Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit.
constraints.canSwapWithMatch = !limitSrc;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 4fedca608ea..2053f6ac5b5 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,6 +62,7 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
+using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
@@ -144,22 +145,33 @@ void Pipeline::validatePipeline() const {
} else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
// The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
- const auto firstStage = _sources.front().get();
+ const auto firstStageConstraints = _sources.front()->constraints(_splitState);
if (nss.isCollectionlessAggregateNS() &&
- !firstStage->constraints(_splitState).isIndependentOfAnyCollection) {
+ !firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
- << firstStage->getSourceName()
+ << _sources.front()->getSourceName()
<< "'; a collection is required.");
}
if (!nss.isCollectionlessAggregateNS() &&
- firstStage->constraints(_splitState).isIndependentOfAnyCollection) {
+ firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
- str::stream() << "'" << firstStage->getSourceName()
+ str::stream() << "'" << _sources.front()->getSourceName()
<< "' can only be run with {aggregate: 1}");
}
+
+ // If the first stage is a $changeStream stage, then all stages in the pipeline must be
+ // either $changeStream stages or whitelisted as being able to run in a change stream.
+ if (firstStageConstraints.isChangeStreamStage()) {
+ for (auto&& source : _sources) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << source->getSourceName()
+ << " is not permitted in a $changeStream pipeline",
+ source->constraints(_splitState).isAllowedInChangeStream());
+ }
+ }
}
// Verify that each stage is in a legal position within the pipeline.