summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp53
1 files changed, 36 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 63dae04e325..31cd015ea97 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
+#include <algorithm>
+
#include "mongo/base/error_codes.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/catalog/document_validation.h"
@@ -134,14 +136,16 @@ Status Pipeline::validatePipeline() const {
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
const auto firstStage = _sources.front().get();
- if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) {
+ if (nss.isCollectionlessAggregateNS() &&
+ !firstStage->constraints().isIndependentOfAnyCollection) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
<< firstStage->getSourceName()
<< "'; a collection is required."};
}
- if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) {
+ if (!nss.isCollectionlessAggregateNS() &&
+ firstStage->constraints().isIndependentOfAnyCollection) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "'" << firstStage->getSourceName()
<< "' can only be run with {aggregate: 1}"};
@@ -154,11 +158,21 @@ Status Pipeline::validatePipeline() const {
Status Pipeline::validateFacetPipeline() const {
if (_sources.empty()) {
- return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."};
- } else if (_sources.front()->isInitialSource()) {
- return {ErrorCodes::BadValue,
- str::stream() << _sources.front()->getSourceName()
- << " is not allowed to be used within a $facet stage."};
+ return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"};
+ }
+ for (auto&& stage : _sources) {
+ auto stageConstraints = stage->constraints();
+ if (!stageConstraints.isAllowedInsideFacetStage) {
+ return {ErrorCodes::BadValue,
+ str::stream() << stage->getSourceName()
+ << " is not allowed to be used within a $facet stage",
+ 40550};
+ }
+ // We expect a stage within a $facet stage to have these properties.
+ invariant(stageConstraints.requiresInputDocSource);
+ invariant(!stageConstraints.isIndependentOfAnyCollection);
+ invariant(stageConstraints.requiredPosition ==
+ DocumentSource::StageConstraints::PositionRequirement::kNone);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -170,10 +184,13 @@ Status Pipeline::validateFacetPipeline() const {
Status Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->isInitialSource() && i != 0) {
+ if (stage->constraints().requiredPosition ==
+ DocumentSource::StageConstraints::PositionRequirement::kFirst &&
+ i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
- << " is only valid as the first stage in a pipeline."};
+ << " is only valid as the first stage in a pipeline.",
+ 40549};
}
auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
if (i != 0 && matchStage && matchStage->isTextQuery()) {
@@ -182,8 +199,13 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const {
17313};
}
- if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) {
- return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"};
+ if (stage->constraints().requiredPosition ==
+ DocumentSource::StageConstraints::PositionRequirement::kLast &&
+ i != _sources.size() - 1) {
+ return {ErrorCodes::BadValue,
+ str::stream() << stage->getSourceName()
+ << " can only be the final stage in the pipeline",
+ 40551};
}
++i;
}
@@ -373,12 +395,9 @@ BSONObj Pipeline::getInitialQuery() const {
}
bool Pipeline::needsPrimaryShardMerger() const {
- for (auto&& source : _sources) {
- if (source->needsPrimaryShard()) {
- return true;
- }
- }
- return false;
+ return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) {
+ return stage->constraints().mustRunOnPrimaryShardIfSharded;
+ });
}
std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {