summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-10-04 18:26:30 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-10-10 10:36:25 +0100
commitf74896d9aae90d0b31eeb06f37f59ca386c03570 (patch)
treea05779131bc9fd608d2e6c6c0608251ea7a21d81 /src/mongo/db
parent6ad292704df43031ee94f696e709bf6285376ed4 (diff)
downloadmongo-f74896d9aae90d0b31eeb06f37f59ca386c03570.tar.gz
SERVER-29137 Implement $changeStream whitelist
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source.h65
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h3
-rw-r--r--src/mongo/db/pipeline/document_source_match.h3
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h3
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h17
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp22
9 files changed, 101 insertions, 28 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 3e3fd98a1df..bc9dce714ad 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -174,24 +174,49 @@ public:
enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
/**
+ * A ChangeStreamRequirement determines whether a particular stage is itself a ChangeStream
+ * stage, whether it is allowed to exist in a $changeStream pipeline, or whether it is
+ * blacklisted from $changeStream.
+ */
+ enum class ChangeStreamRequirement { kChangeStreamStage, kWhitelist, kBlacklist };
+
+ /**
* A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
*/
enum class FacetRequirement { kAllowed, kNotAllowed };
- StageConstraints(StreamType streamType,
- PositionRequirement requiredPosition,
- HostTypeRequirement hostRequirement,
- DiskUseRequirement diskRequirement,
- FacetRequirement facetRequirement)
+ StageConstraints(
+ StreamType streamType,
+ PositionRequirement requiredPosition,
+ HostTypeRequirement hostRequirement,
+ DiskUseRequirement diskRequirement,
+ FacetRequirement facetRequirement,
+ ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
: requiredPosition(requiredPosition),
hostRequirement(hostRequirement),
diskRequirement(diskRequirement),
+ changeStreamRequirement(changeStreamRequirement),
facetRequirement(facetRequirement),
streamType(streamType) {
- // Stages which are allowed to run in $facet pipelines must not have any specific
- // position requirements.
- invariant(!isAllowedInsideFacetStage() ||
- requiredPosition == PositionRequirement::kNone);
+ // Stages which are allowed to run in $facet must not have any position requirements.
+ invariant(
+ !(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone));
+
+ // No change stream stages are permitted to run in a $facet pipeline.
+ invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage()));
+
+ // Only streaming stages are permitted in $changeStream pipelines.
+ invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking));
+
+ // A stage which is whitelisted for $changeStream cannot have a requirement to run on a
+ // shard, since it needs to be able to run on mongoS in a cluster.
+ invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
+ (hostRequirement == HostTypeRequirement::kAnyShard ||
+ hostRequirement == HostTypeRequirement::kPrimaryShard)));
+
+ // A stage which is whitelisted for $changeStream cannot have a position requirement.
+ invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
+ requiredPosition != PositionRequirement::kNone));
}
/**
@@ -214,12 +239,27 @@ public:
}
/**
- * True if this stage should be permitted to run in a $facet pipeline.
+ * True if this stage is permitted to run in a $facet pipeline.
*/
bool isAllowedInsideFacetStage() const {
return facetRequirement == FacetRequirement::kAllowed;
}
+ /**
+ * True if this stage is permitted to run in a pipeline which starts with $changeStream.
+ */
+ bool isAllowedInChangeStream() const {
+ return changeStreamRequirement != ChangeStreamRequirement::kBlacklist;
+ }
+
+ /**
+ * True if this stage is itself a $changeStream stage, and is therefore implicitly allowed
+ * to run in a pipeline which begins with $changeStream.
+ */
+ bool isChangeStreamStage() const {
+ return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
+ }
+
// Indicates whether this stage needs to be at a particular position in the pipeline.
const PositionRequirement requiredPosition;
@@ -231,6 +271,10 @@ public:
// files if its memory usage becomes excessive.
const DiskUseRequirement diskRequirement;
+ // Indicates whether this stage is itself a $changeStream stage, or if not whether it may
+ // exist in a pipeline which begins with $changeStream.
+ const ChangeStreamRequirement changeStreamRequirement;
+
// Indicates whether this stage may run inside a $facet stage.
const FacetRequirement facetRequirement;
@@ -253,6 +297,7 @@ public:
bool canSwapWithMatch = false;
};
+ using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
using PositionRequirement = StageConstraints::PositionRequirement;
using DiskUseRequirement = StageConstraints::DiskUseRequirement;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index c8641d5c943..64d01baf450 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -106,7 +106,8 @@ DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints(
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
/**
@@ -156,7 +157,8 @@ public:
(pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone
: HostTypeRequirement::kMongoS),
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 8f5877887cb..956135f9d7a 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -64,7 +64,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
@@ -101,7 +102,8 @@ public:
(pipeState == Pipeline::SplitState::kUnsplit ? HostTypeRequirement::kNone
: HostTypeRequirement::kMongoS),
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 75518fa0dec..18d38b52fdb 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -66,7 +66,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage);
constraints.canSwapWithMatch = true;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index d6705fc223b..eabc4de92ae 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -55,7 +55,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ ChangeStreamRequirement::kWhitelist};
}
Value serialize(
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index 0e13e9e48d3..c6c83794aec 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -45,7 +45,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ ChangeStreamRequirement::kWhitelist};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 557ec06c9f2..70403cdf3c2 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -102,11 +102,18 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
+ StageConstraints constraints(
+ StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
+ ? FacetRequirement::kNotAllowed
+ : FacetRequirement::kAllowed),
+ (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
+ ? ChangeStreamRequirement::kChangeStreamStage
+ : ChangeStreamRequirement::kWhitelist));
+
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index e8cb7f25f94..1b6f69d823e 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -68,7 +68,9 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
_mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed);
+ _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed,
+ _mergingPresorted ? ChangeStreamRequirement::kWhitelist
+ : ChangeStreamRequirement::kBlacklist);
// Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit.
constraints.canSwapWithMatch = !limitSrc;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 4fedca608ea..2053f6ac5b5 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,6 +62,7 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
+using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
@@ -144,22 +145,33 @@ void Pipeline::validatePipeline() const {
} else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
// The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
- const auto firstStage = _sources.front().get();
+ const auto firstStageConstraints = _sources.front()->constraints(_splitState);
if (nss.isCollectionlessAggregateNS() &&
- !firstStage->constraints(_splitState).isIndependentOfAnyCollection) {
+ !firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
- << firstStage->getSourceName()
+ << _sources.front()->getSourceName()
<< "'; a collection is required.");
}
if (!nss.isCollectionlessAggregateNS() &&
- firstStage->constraints(_splitState).isIndependentOfAnyCollection) {
+ firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
- str::stream() << "'" << firstStage->getSourceName()
+ str::stream() << "'" << _sources.front()->getSourceName()
<< "' can only be run with {aggregate: 1}");
}
+
+ // If the first stage is a $changeStream stage, then all stages in the pipeline must be
+ // either $changeStream stages or whitelisted as being able to run in a change stream.
+ if (firstStageConstraints.isChangeStreamStage()) {
+ for (auto&& source : _sources) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << source->getSourceName()
+ << " is not permitted in a $changeStream pipeline",
+ source->constraints(_splitState).isAllowedInChangeStream());
+ }
+ }
}
// Verify that each stage is in a legal position within the pipeline.