diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 63dae04e325..31cd015ea97 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -32,6 +32,8 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_optimizations.h" +#include <algorithm> + #include "mongo/base/error_codes.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/document_validation.h" @@ -134,14 +136,16 @@ Status Pipeline::validatePipeline() const { // {aggregate: 1} is only valid for collectionless sources, and vice-versa. const auto firstStage = _sources.front().get(); - if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) { + if (nss.isCollectionlessAggregateNS() && + !firstStage->constraints().isIndependentOfAnyCollection) { return {ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" << firstStage->getSourceName() << "'; a collection is required."}; } - if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) { + if (!nss.isCollectionlessAggregateNS() && + firstStage->constraints().isIndependentOfAnyCollection) { return {ErrorCodes::InvalidNamespace, str::stream() << "'" << firstStage->getSourceName() << "' can only be run with {aggregate: 1}"}; @@ -154,11 +158,21 @@ Status Pipeline::validatePipeline() const { Status Pipeline::validateFacetPipeline() const { if (_sources.empty()) { - return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."}; - } else if (_sources.front()->isInitialSource()) { - return {ErrorCodes::BadValue, - str::stream() << _sources.front()->getSourceName() - << " is not allowed to be used within a $facet stage."}; + return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"}; + } + for (auto&& stage : _sources) { + auto stageConstraints = stage->constraints(); + if (!stageConstraints.isAllowedInsideFacetStage) { + return {ErrorCodes::BadValue, + str::stream() << stage->getSourceName() + << " is not allowed to be used within a $facet stage", + 40550}; + } + // We expect a stage within a $facet stage to have these properties. + invariant(stageConstraints.requiresInputDocSource); + invariant(!stageConstraints.isIndependentOfAnyCollection); + invariant(stageConstraints.requiredPosition == + DocumentSource::StageConstraints::PositionRequirement::kNone); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -170,10 +184,13 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->isInitialSource() && i != 0) { + if (stage->constraints().requiredPosition == + DocumentSource::StageConstraints::PositionRequirement::kFirst && + i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() - << " is only valid as the first stage in a pipeline."}; + << " is only valid as the first stage in a pipeline.", + 40549}; } auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); if (i != 0 && matchStage && matchStage->isTextQuery()) { @@ -182,8 +199,13 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) { - return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"}; + if (stage->constraints().requiredPosition == + DocumentSource::StageConstraints::PositionRequirement::kLast && + i != _sources.size() - 1) { + return {ErrorCodes::BadValue, + str::stream() << stage->getSourceName() + << " can only be the final stage in the pipeline", + 40551}; } ++i; } @@ -373,12 +395,9 @@ BSONObj Pipeline::getInitialQuery() const { } bool Pipeline::needsPrimaryShardMerger() const { - for (auto&& source : _sources) { - if (source->needsPrimaryShard()) { - return true; - } - } - return false; + return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { + return stage->constraints().mustRunOnPrimaryShardIfSharded; + }); } std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { |