diff options
author | Nicholas Zolnierz <nicholas.zolnierz@mongodb.com> | 2020-02-19 22:58:38 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-19 22:58:38 +0000 |
commit | b06b7d7dc5badc18c2977ee22ecb8ad339f5f27a (patch) | |
tree | 63fc225aee7ed58fd3bd59f310acb181a2d04b67 /src/mongo/db/pipeline/pipeline.cpp | |
parent | c54a777a4a154984f5595b11993d7d009350a38c (diff) | |
download | mongo-b06b7d7dc5badc18c2977ee22ecb8ad339f5f27a.tar.gz |
SERVER-46015 Cleanup Pipeline parsing for aggregation stages with child pipelines
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 210 |
1 files changed, 86 insertions, 124 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index bd8cf53df0c..eb3a4d97914 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -69,6 +69,54 @@ Value appendExecStats(Value docSource, const CommonStats& stats) { doc.addField("executionTimeMillisEstimate", Value(executionTimeMillisEstimate)); return Value(doc.freeze()); } + +/** + * Performs validation checking specific to top-level pipelines. Throws an assertion if the + * pipeline is invalid. + */ +void validateTopLevelPipeline(const Pipeline& pipeline) { + // Verify that the specified namespace is valid for the initial stage of this pipeline. + const NamespaceString& nss = pipeline.getContext()->ns; + + auto sources = pipeline.getSources(); + + if (sources.empty()) { + uassert(ErrorCodes::InvalidNamespace, + "{aggregate: 1} is not valid for an empty pipeline.", + !nss.isCollectionlessAggregateNS()); + return; + } + + if ("$mergeCursors"_sd != sources.front()->getSourceName()) { + // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, + // {aggregate: 1} is only valid for collectionless sources, and vice-versa. + const auto firstStageConstraints = sources.front()->constraints(); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "{aggregate: 1} is not valid for '" + << sources.front()->getSourceName() << "'; a collection is required.", + !(nss.isCollectionlessAggregateNS() && + !firstStageConstraints.isIndependentOfAnyCollection)); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "'" << sources.front()->getSourceName() + << "' can only be run with {aggregate: 1}", + !(!nss.isCollectionlessAggregateNS() && + firstStageConstraints.isIndependentOfAnyCollection)); + + // If the first stage is a $changeStream stage, then all stages in the pipeline must be + // either $changeStream stages or whitelisted as being able to run in a change stream. + if (firstStageConstraints.isChangeStreamStage()) { + for (auto&& source : sources) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << source->getSourceName() + << " is not permitted in a $changeStream pipeline", + source->constraints().isAllowedInChangeStream()); + } + } + } +} + } // namespace MONGO_FAIL_POINT_DEFINE(disablePipelineOptimization); @@ -100,20 +148,10 @@ Pipeline::~Pipeline() { invariant(_disposed); } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parse( - const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { - return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseFacetPipeline( - const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { - return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOrFacetPipeline( +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::parse( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx, - const bool isFacetPipeline) { + PipelineValidatorCallback validator) { SourceContainer stages; @@ -122,105 +160,32 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOr stages.insert(stages.end(), parsedSources.begin(), parsedSources.end()); } - return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::create( - SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { - return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createFacetPipeline( - SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { - return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelOrFacetPipeline( - SourceContainer stages, - const intrusive_ptr<ExpressionContext>& expCtx, - const bool isFacetPipeline) { std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx), PipelineDeleter(expCtx->opCtx)); - try { - pipeline->validate(isFacetPipeline); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - pipeline->stitch(); - return std::move(pipeline); -} -void Pipeline::validate(bool isFacetPipeline) const { - if (isFacetPipeline) { - validateFacetPipeline(); - } else { - validateTopLevelPipeline(); + // First call the context-specific validator, which may be different for top-level pipelines + // versus nested pipelines. + if (validator) + validator(*pipeline); + else { + validateTopLevelPipeline(*pipeline); } - validateCommon(); -} - -void Pipeline::validateTopLevelPipeline() const { - // Verify that the specified namespace is valid for the initial stage of this pipeline. - const NamespaceString& nss = pCtx->ns; + // Next run through the common validation rules that apply to every pipeline. + pipeline->validateCommon(); - if (_sources.empty()) { - if (nss.isCollectionlessAggregateNS()) { - uasserted(ErrorCodes::InvalidNamespace, - "{aggregate: 1} is not valid for an empty pipeline."); - } - return; - } - if ("$mergeCursors"_sd != _sources.front()->getSourceName()) { - // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, - // {aggregate: 1} is only valid for collectionless sources, and vice-versa. - const auto firstStageConstraints = _sources.front()->constraints(_splitState); - - if (nss.isCollectionlessAggregateNS() && - !firstStageConstraints.isIndependentOfAnyCollection) { - uasserted(ErrorCodes::InvalidNamespace, - str::stream() - << "{aggregate: 1} is not valid for '" - << _sources.front()->getSourceName() << "'; a collection is required."); - } - - if (!nss.isCollectionlessAggregateNS() && - firstStageConstraints.isIndependentOfAnyCollection) { - uasserted(ErrorCodes::InvalidNamespace, - str::stream() << "'" << _sources.front()->getSourceName() - << "' can only be run with {aggregate: 1}"); - } - - // If the first stage is a $changeStream stage, then all stages in the pipeline must be - // either $changeStream stages or whitelisted as being able to run in a change stream. - if (firstStageConstraints.isChangeStreamStage()) { - for (auto&& source : _sources) { - uassert(ErrorCodes::IllegalOperation, - str::stream() << source->getSourceName() - << " is not permitted in a $changeStream pipeline", - source->constraints(_splitState).isAllowedInChangeStream()); - } - } - } + pipeline->stitch(); + return pipeline; } -void Pipeline::validateFacetPipeline() const { - if (_sources.empty()) { - uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"); - } +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::create( + SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx), + PipelineDeleter(expCtx->opCtx)); - for (auto&& stage : _sources) { - auto stageConstraints = stage->constraints(_splitState); - if (!stageConstraints.isAllowedInsideFacetStage()) { - uasserted(40600, - str::stream() << stage->getSourceName() - << " is not allowed to be used within a $facet stage"); - } - // We expect a stage within a $facet stage to have these properties. - invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); - invariant(!stageConstraints.isIndependentOfAnyCollection); - } + pipeline->validateCommon(); + pipeline->stitch(); + return pipeline; } void Pipeline::validateCommon() const { @@ -229,22 +194,21 @@ void Pipeline::validateCommon() const { auto constraints = stage->constraints(_splitState); // Verify that all stages adhere to their PositionRequirement constraints. - if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) { - uasserted(40602, - str::stream() << stage->getSourceName() - << " is only valid as the first stage in a pipeline."); - } - auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); - if (i != 0 && matchStage && matchStage->isTextQuery()) { - uasserted(17313, "$match with $text is only allowed as the first pipeline stage"); - } + uassert(40602, + str::stream() << stage->getSourceName() + << " is only valid as the first stage in a pipeline.", + !(constraints.requiredPosition == PositionRequirement::kFirst && i != 0)); - if (constraints.requiredPosition == PositionRequirement::kLast && - i != _sources.size() - 1) { - uasserted(40601, - str::stream() << stage->getSourceName() - << " can only be the final stage in the pipeline"); - } + auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); + uassert(17313, + "$match with $text is only allowed as the first pipeline stage", + !(i != 0 && matchStage && matchStage->isTextQuery())); + + uassert(40601, + str::stream() << stage->getSourceName() + << " can only be the final stage in the pipeline", + !(constraints.requiredPosition == PositionRequirement::kLast && + i != _sources.size() - 1)); ++i; // Verify that we are not attempting to run a mongoS-only stage on mongoD. @@ -252,12 +216,10 @@ void Pipeline::validateCommon() const { str::stream() << stage->getSourceName() << " can only be run on mongoS", !(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos)); - if (pCtx->inMultiDocumentTransaction) { - uassert(ErrorCodes::OperationNotSupportedInTransaction, - str::stream() << "Stage not supported inside of a multi-document transaction: " - << stage->getSourceName(), - constraints.isAllowedInTransaction()); - } + uassert(ErrorCodes::OperationNotSupportedInTransaction, + str::stream() << "Stage not supported inside of a multi-document transaction: " + << stage->getSourceName(), + !(pCtx->inMultiDocumentTransaction && !constraints.isAllowedInTransaction())); } } @@ -661,7 +623,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (opts.optimize) { pipeline->optimizePipeline(); |