diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-04 18:26:30 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-10 10:36:25 +0100 |
commit | f74896d9aae90d0b31eeb06f37f59ca386c03570 (patch) | |
tree | a05779131bc9fd608d2e6c6c0608251ea7a21d81 /src/mongo/db/pipeline/pipeline.cpp | |
parent | 6ad292704df43031ee94f696e709bf6285376ed4 (diff) | |
download | mongo-f74896d9aae90d0b31eeb06f37f59ca386c03570.tar.gz |
SERVER-29137 Implement $changeStream whitelist
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 4fedca608ea..2053f6ac5b5 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -62,6 +62,7 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; +using ChangeStreamRequirement = DocumentSource::StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement; @@ -144,22 +145,33 @@ void Pipeline::validatePipeline() const { } else if (!dynamic_cast<DocumentSourceMergeCursors*>(_sources.front().get())) { // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, // {aggregate: 1} is only valid for collectionless sources, and vice-versa. - const auto firstStage = _sources.front().get(); + const auto firstStageConstraints = _sources.front()->constraints(_splitState); if (nss.isCollectionlessAggregateNS() && - !firstStage->constraints(_splitState).isIndependentOfAnyCollection) { + !firstStageConstraints.isIndependentOfAnyCollection) { uasserted(ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" - << firstStage->getSourceName() + << _sources.front()->getSourceName() << "'; a collection is required."); } if (!nss.isCollectionlessAggregateNS() && - firstStage->constraints(_splitState).isIndependentOfAnyCollection) { + firstStageConstraints.isIndependentOfAnyCollection) { uasserted(ErrorCodes::InvalidNamespace, - str::stream() << "'" << firstStage->getSourceName() + str::stream() << "'" << _sources.front()->getSourceName() << "' can only be run with {aggregate: 1}"); } + + // If the first stage is a $changeStream stage, then all stages in the pipeline must be + // either $changeStream stages or whitelisted as being able to run in a change stream. + if (firstStageConstraints.isChangeStreamStage()) { + for (auto&& source : _sources) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << source->getSourceName() + << " is not permitted in a $changeStream pipeline", + source->constraints(_splitState).isAllowedInChangeStream()); + } + } } // Verify that each stage is in a legal position within the pipeline. |