summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2020-02-19 22:58:38 +0000
committerevergreen <evergreen@mongodb.com>2020-02-19 22:58:38 +0000
commitb06b7d7dc5badc18c2977ee22ecb8ad339f5f27a (patch)
tree63fc225aee7ed58fd3bd59f310acb181a2d04b67 /src/mongo/db/pipeline/pipeline.cpp
parentc54a777a4a154984f5595b11993d7d009350a38c (diff)
downloadmongo-b06b7d7dc5badc18c2977ee22ecb8ad339f5f27a.tar.gz
SERVER-46015 Cleanup Pipeline parsing for aggregation stages with child pipelines
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp210
1 files changed, 86 insertions, 124 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index bd8cf53df0c..eb3a4d97914 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -69,6 +69,54 @@ Value appendExecStats(Value docSource, const CommonStats& stats) {
doc.addField("executionTimeMillisEstimate", Value(executionTimeMillisEstimate));
return Value(doc.freeze());
}
+
+/**
+ * Performs validation checking specific to top-level pipelines. Throws an assertion if the
+ * pipeline is invalid.
+ */
+void validateTopLevelPipeline(const Pipeline& pipeline) {
+ // Verify that the specified namespace is valid for the initial stage of this pipeline.
+ const NamespaceString& nss = pipeline.getContext()->ns;
+
+ auto sources = pipeline.getSources();
+
+ if (sources.empty()) {
+ uassert(ErrorCodes::InvalidNamespace,
+ "{aggregate: 1} is not valid for an empty pipeline.",
+ !nss.isCollectionlessAggregateNS());
+ return;
+ }
+
+ if ("$mergeCursors"_sd != sources.front()->getSourceName()) {
+ // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
+ // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
+ const auto firstStageConstraints = sources.front()->constraints();
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "{aggregate: 1} is not valid for '"
+ << sources.front()->getSourceName() << "'; a collection is required.",
+ !(nss.isCollectionlessAggregateNS() &&
+ !firstStageConstraints.isIndependentOfAnyCollection));
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "'" << sources.front()->getSourceName()
+ << "' can only be run with {aggregate: 1}",
+ !(!nss.isCollectionlessAggregateNS() &&
+ firstStageConstraints.isIndependentOfAnyCollection));
+
+ // If the first stage is a $changeStream stage, then all stages in the pipeline must be
+ // either $changeStream stages or whitelisted as being able to run in a change stream.
+ if (firstStageConstraints.isChangeStreamStage()) {
+ for (auto&& source : sources) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << source->getSourceName()
+ << " is not permitted in a $changeStream pipeline",
+ source->constraints().isAllowedInChangeStream());
+ }
+ }
+ }
+}
+
} // namespace
MONGO_FAIL_POINT_DEFINE(disablePipelineOptimization);
@@ -100,20 +148,10 @@ Pipeline::~Pipeline() {
invariant(_disposed);
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parse(
- const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
- return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseFacetPipeline(
- const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
- return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOrFacetPipeline(
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::parse(
const std::vector<BSONObj>& rawPipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
- const bool isFacetPipeline) {
+ PipelineValidatorCallback validator) {
SourceContainer stages;
@@ -122,105 +160,32 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOr
stages.insert(stages.end(), parsedSources.begin(), parsedSources.end());
}
- return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::create(
- SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
- return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createFacetPipeline(
- SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
- return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelOrFacetPipeline(
- SourceContainer stages,
- const intrusive_ptr<ExpressionContext>& expCtx,
- const bool isFacetPipeline) {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx),
PipelineDeleter(expCtx->opCtx));
- try {
- pipeline->validate(isFacetPipeline);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- pipeline->stitch();
- return std::move(pipeline);
-}
-void Pipeline::validate(bool isFacetPipeline) const {
- if (isFacetPipeline) {
- validateFacetPipeline();
- } else {
- validateTopLevelPipeline();
+ // First call the context-specific validator, which may be different for top-level pipelines
+ // versus nested pipelines.
+ if (validator)
+ validator(*pipeline);
+ else {
+ validateTopLevelPipeline(*pipeline);
}
- validateCommon();
-}
-
-void Pipeline::validateTopLevelPipeline() const {
- // Verify that the specified namespace is valid for the initial stage of this pipeline.
- const NamespaceString& nss = pCtx->ns;
+ // Next run through the common validation rules that apply to every pipeline.
+ pipeline->validateCommon();
- if (_sources.empty()) {
- if (nss.isCollectionlessAggregateNS()) {
- uasserted(ErrorCodes::InvalidNamespace,
- "{aggregate: 1} is not valid for an empty pipeline.");
- }
- return;
- }
- if ("$mergeCursors"_sd != _sources.front()->getSourceName()) {
- // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
- // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
- const auto firstStageConstraints = _sources.front()->constraints(_splitState);
-
- if (nss.isCollectionlessAggregateNS() &&
- !firstStageConstraints.isIndependentOfAnyCollection) {
- uasserted(ErrorCodes::InvalidNamespace,
- str::stream()
- << "{aggregate: 1} is not valid for '"
- << _sources.front()->getSourceName() << "'; a collection is required.");
- }
-
- if (!nss.isCollectionlessAggregateNS() &&
- firstStageConstraints.isIndependentOfAnyCollection) {
- uasserted(ErrorCodes::InvalidNamespace,
- str::stream() << "'" << _sources.front()->getSourceName()
- << "' can only be run with {aggregate: 1}");
- }
-
- // If the first stage is a $changeStream stage, then all stages in the pipeline must be
- // either $changeStream stages or whitelisted as being able to run in a change stream.
- if (firstStageConstraints.isChangeStreamStage()) {
- for (auto&& source : _sources) {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << source->getSourceName()
- << " is not permitted in a $changeStream pipeline",
- source->constraints(_splitState).isAllowedInChangeStream());
- }
- }
- }
+ pipeline->stitch();
+ return pipeline;
}
-void Pipeline::validateFacetPipeline() const {
- if (_sources.empty()) {
- uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty");
- }
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::create(
+ SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx),
+ PipelineDeleter(expCtx->opCtx));
- for (auto&& stage : _sources) {
- auto stageConstraints = stage->constraints(_splitState);
- if (!stageConstraints.isAllowedInsideFacetStage()) {
- uasserted(40600,
- str::stream() << stage->getSourceName()
- << " is not allowed to be used within a $facet stage");
- }
- // We expect a stage within a $facet stage to have these properties.
- invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
- invariant(!stageConstraints.isIndependentOfAnyCollection);
- }
+ pipeline->validateCommon();
+ pipeline->stitch();
+ return pipeline;
}
void Pipeline::validateCommon() const {
@@ -229,22 +194,21 @@ void Pipeline::validateCommon() const {
auto constraints = stage->constraints(_splitState);
// Verify that all stages adhere to their PositionRequirement constraints.
- if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) {
- uasserted(40602,
- str::stream() << stage->getSourceName()
- << " is only valid as the first stage in a pipeline.");
- }
- auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
- if (i != 0 && matchStage && matchStage->isTextQuery()) {
- uasserted(17313, "$match with $text is only allowed as the first pipeline stage");
- }
+ uassert(40602,
+ str::stream() << stage->getSourceName()
+ << " is only valid as the first stage in a pipeline.",
+ !(constraints.requiredPosition == PositionRequirement::kFirst && i != 0));
- if (constraints.requiredPosition == PositionRequirement::kLast &&
- i != _sources.size() - 1) {
- uasserted(40601,
- str::stream() << stage->getSourceName()
- << " can only be the final stage in the pipeline");
- }
+ auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
+ uassert(17313,
+ "$match with $text is only allowed as the first pipeline stage",
+ !(i != 0 && matchStage && matchStage->isTextQuery()));
+
+ uassert(40601,
+ str::stream() << stage->getSourceName()
+ << " can only be the final stage in the pipeline",
+ !(constraints.requiredPosition == PositionRequirement::kLast &&
+ i != _sources.size() - 1));
++i;
// Verify that we are not attempting to run a mongoS-only stage on mongoD.
@@ -252,12 +216,10 @@ void Pipeline::validateCommon() const {
str::stream() << stage->getSourceName() << " can only be run on mongoS",
!(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos));
- if (pCtx->inMultiDocumentTransaction) {
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- str::stream() << "Stage not supported inside of a multi-document transaction: "
- << stage->getSourceName(),
- constraints.isAllowedInTransaction());
- }
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream() << "Stage not supported inside of a multi-document transaction: "
+ << stage->getSourceName(),
+ !(pCtx->inMultiDocumentTransaction && !constraints.isAllowedInTransaction()));
}
}
@@ -661,7 +623,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (opts.optimize) {
pipeline->optimizePipeline();