summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-10-04 18:26:30 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-10-10 10:36:25 +0100
commitf74896d9aae90d0b31eeb06f37f59ca386c03570 (patch)
treea05779131bc9fd608d2e6c6c0608251ea7a21d81 /src/mongo/db/pipeline/pipeline.cpp
parent6ad292704df43031ee94f696e709bf6285376ed4 (diff)
downloadmongo-f74896d9aae90d0b31eeb06f37f59ca386c03570.tar.gz
SERVER-29137 Implement $changeStream whitelist
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp22
1 files changed, 17 insertions, 5 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 4fedca608ea..2053f6ac5b5 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,6 +62,7 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
+using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
@@ -144,22 +145,33 @@ void Pipeline::validatePipeline() const {
} else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) {
// The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
- const auto firstStage = _sources.front().get();
+ const auto firstStageConstraints = _sources.front()->constraints(_splitState);
if (nss.isCollectionlessAggregateNS() &&
- !firstStage->constraints(_splitState).isIndependentOfAnyCollection) {
+ !firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
- << firstStage->getSourceName()
+ << _sources.front()->getSourceName()
<< "'; a collection is required.");
}
if (!nss.isCollectionlessAggregateNS() &&
- firstStage->constraints(_splitState).isIndependentOfAnyCollection) {
+ firstStageConstraints.isIndependentOfAnyCollection) {
uasserted(ErrorCodes::InvalidNamespace,
- str::stream() << "'" << firstStage->getSourceName()
+ str::stream() << "'" << _sources.front()->getSourceName()
<< "' can only be run with {aggregate: 1}");
}
+
+ // If the first stage is a $changeStream stage, then all stages in the pipeline must be
+ // either $changeStream stages or whitelisted as being able to run in a change stream.
+ if (firstStageConstraints.isChangeStreamStage()) {
+ for (auto&& source : _sources) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << source->getSourceName()
+ << " is not permitted in a $changeStream pipeline",
+ source->constraints(_splitState).isAllowedInChangeStream());
+ }
+ }
}
// Verify that each stage is in a legal position within the pipeline.