diff options
author | Romans Kasperovics <romans.kasperovics@mongodb.com> | 2023-02-10 01:32:07 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-10 05:10:40 +0000 |
commit | 1de0264c6f0d48a21339e6a6a6a54b6b1e916f85 (patch) | |
tree | f10039e6b4e7a2278e6704b5450609746ce079e4 | |
parent | 5300decfd223ba6cf5766faafe0603ee1ca6961d (diff) | |
download | mongo-1de0264c6f0d48a21339e6a6a6a54b6b1e916f85.tar.gz |
SERVER-71839 Add appear-once and custom position stage constraints
-rw-r--r-- | jstests/aggregation/sources/multiple_unpack_bucket_error.js | 28 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_geo_near.h | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stage_constraints.h | 16 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 5 |
10 files changed, 83 insertions, 46 deletions
diff --git a/jstests/aggregation/sources/multiple_unpack_bucket_error.js b/jstests/aggregation/sources/multiple_unpack_bucket_error.js index 3b5f20bf631..b00c6265f9b 100644 --- a/jstests/aggregation/sources/multiple_unpack_bucket_error.js +++ b/jstests/aggregation/sources/multiple_unpack_bucket_error.js @@ -13,36 +13,48 @@ coll.drop(); assert.commandFailedWithCode(db.runCommand({ aggregate: coll.getName(), pipeline: [ - {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}}, - {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}} + { + $_internalUnpackBucket: + {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)} + }, + { + $_internalUnpackBucket: + {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)} + } ], cursor: {} }), - 5348304); + 7183900); // $_unpackBucket is an alias of $_internalUnpackBucket, the same restriction should apply. assert.commandFailedWithCode(db.runCommand({ aggregate: coll.getName(), pipeline: [ {$_unpackBucket: {timeField: 'time'}}, - {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}} + { + $_internalUnpackBucket: + {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)} + } ], cursor: {} }), - 5348304); + 7183900); assert.commandFailedWithCode(db.runCommand({ aggregate: coll.getName(), pipeline: [ - {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}}, + { + $_internalUnpackBucket: + {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)} + }, {$_unpackBucket: {timeField: 'time'}} ], cursor: {} }), - 5348304); + 7183900); assert.commandFailedWithCode(db.runCommand({ aggregate: coll.getName(), pipeline: [{$_unpackBucket: {timeField: 'time'}}, {$_unpackBucket: {timeField: 'time'}}], cursor: {} }), - 5348304); + 7183900); })(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 4618d913e81..f16d2cee632 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -383,6 +383,16 @@ public: Pipeline::SplitState = Pipeline::SplitState::kUnsplit) const = 0; /** + * If a stage's StageConstraints::PositionRequirement is kCustom, then it should also override + * this method, which will be called by the validation process. + */ + virtual void validatePipelinePosition(bool alreadyOptimized, + Pipeline::SourceContainer::const_iterator pos, + const Pipeline::SourceContainer& container) const { + MONGO_UNIMPLEMENTED_TASSERT(7183905); + }; + + /** * Informs the stage that it is no longer needed and can release its resources. After dispose() * is called the stage must still be able to handle calls to getNext(), but can return kEOF. * diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index c3d2eed51fd..c2a2b838b81 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -51,7 +51,7 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const final { return {StreamType::kStreaming, - PositionRequirement::kFirstAfterOptimization, + PositionRequirement::kCustom, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, @@ -60,6 +60,17 @@ public: UnionRequirement::kAllowed}; } + void validatePipelinePosition(bool alreadyOptimized, + Pipeline::SourceContainer::const_iterator pos, + const Pipeline::SourceContainer& container) const final { + // This stage must be in the first position in the pipeline after optimization. + uassert(40603, + str::stream() << getSourceName() + << " was not the first stage in the pipeline after optimization. Is " + "optimization disabled or inhibited?", + !alreadyOptimized || pos == container.cbegin()); + } + /** * DocumentSourceGeoNear should always be replaced by a DocumentSourceGeoNearCursor before * executing a pipeline, so this method should never be called. diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index 7ec99287ebc..7d3be11e85e 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -105,6 +105,8 @@ public: UnionRequirement::kAllowed, ChangeStreamRequirement::kDenylist}; constraints.canSwapWithMatch = true; + // The user cannot specify multiple $unpackBucket stages in the pipeline. + constraints.canAppearOnlyOnceInPipeline = true; return constraints; } diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp index b7ca9c6310e..f5760429206 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp @@ -157,7 +157,6 @@ void LiteParsedPipeline::tickGlobalStageCounters() const { void LiteParsedPipeline::validate(const OperationContext* opCtx, bool performApiVersionChecks) const { - int internalUnpackBucketCount = 0; for (auto&& stage : _stageSpecs) { const auto& stageName = stage->getParseTimeName(); const auto& stageInfo = LiteParsedDocumentSource::getInfo(stageName); @@ -179,25 +178,10 @@ void LiteParsedPipeline::validate(const OperationContext* opCtx, sometimesCallback); } - internalUnpackBucketCount += - (DocumentSourceInternalUnpackBucket::kStageNameInternal == stageName || - DocumentSourceInternalUnpackBucket::kStageNameExternal == stageName) - ? 1 - : 0; - for (auto&& subPipeline : stage->getSubPipelines()) { subPipeline.validate(opCtx, performApiVersionChecks); } } - - - // Validates that the pipeline contains at most one $_internalUnpackBucket or $_unpackBucket - // stage. - uassert(5348304, - str::stream() << "Encountered pipeline with more than one " - << DocumentSourceInternalUnpackBucket::kStageNameInternal << " or " - << DocumentSourceInternalUnpackBucket::kStageNameExternal << " stage", - internalUnpackBucketCount <= 1); } } // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index d02ff8ee70e..edd3716cf86 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -214,8 +214,7 @@ public: /** * Verifies that the pipeline contains valid stages. Optionally calls - * 'validatePipelineStagesforAPIVersion' with 'opCtx', and throws UserException if there is - * more than one $_internalUnpackBucket stage in the pipeline. + * 'validatePipelineStagesforAPIVersion' with 'opCtx'. */ void validate(const OperationContext* opCtx, bool performApiVersionChecks = true) const; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index d6851e46df3..c924eaa5592 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -234,39 +234,41 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::create( } void Pipeline::validateCommon(bool alreadyOptimized) const { - size_t i = 0; - uassert(ErrorCodes::FailedToParse, str::stream() << "Pipeline length must be no longer than " << internalPipelineLengthLimit << " stages", static_cast<int>(_sources.size()) <= internalPipelineLengthLimit); - for (auto&& stage : _sources) { + // Keep track of stages which can only appear once. + std::set<StringData> singleUseStages; + + for (auto sourceIter = _sources.begin(); sourceIter != _sources.end(); ++sourceIter) { + auto& stage = *sourceIter; auto constraints = stage->constraints(_splitState); // Verify that all stages adhere to their PositionRequirement constraints. uassert(40602, str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline", - !(constraints.requiredPosition == PositionRequirement::kFirst && i != 0)); - uassert(40603, - str::stream() << stage->getSourceName() - << " is only valid as the first stage in an optimized pipeline", - !(alreadyOptimized && - constraints.requiredPosition == PositionRequirement::kFirstAfterOptimization && - i != 0)); + !(constraints.requiredPosition == PositionRequirement::kFirst && + sourceIter != _sources.begin())); + // TODO SERVER-73790: use PositionRequirement::kCustom to validate $match. auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); uassert(17313, "$match with $text is only allowed as the first pipeline stage", - !(i != 0 && matchStage && matchStage->isTextQuery())); + !(sourceIter != _sources.begin() && matchStage && matchStage->isTextQuery())); uassert(40601, str::stream() << stage->getSourceName() << " can only be the final stage in the pipeline", !(constraints.requiredPosition == PositionRequirement::kLast && - i != _sources.size() - 1)); - ++i; + std::next(sourceIter) != _sources.end())); + + // If the stage has a special requirement about its position, validate it. + if (constraints.requiredPosition == PositionRequirement::kCustom) { + stage->validatePipelinePosition(alreadyOptimized, sourceIter, _sources); + } // Verify that we are not attempting to run a mongoS-only stage on mongoD. uassert(40644, @@ -278,6 +280,12 @@ void Pipeline::validateCommon(bool alreadyOptimized) const { str::stream() << "Stage not supported inside of a multi-document transaction: " << stage->getSourceName(), !(pCtx->opCtx->inMultiDocumentTransaction() && !constraints.isAllowedInTransaction())); + + // Verify that a stage which can only appear once doesn't appear more than that. + uassert(7183900, + str::stream() << stage->getSourceName() << " can only be used once in the pipeline", + !(constraints.canAppearOnlyOnceInPipeline && + !singleUseStages.insert(stage->getSourceName()).second)); } } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f0d4d224f42..fd6b1f14589 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -1399,7 +1399,7 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll dynamic_cast<const DocumentSourceInternalUnpackBucket*>( iter->get())) { unpackIter = iter; - uassert(6505001, + tassert(6505001, str::stream() << "Expected at most one " << DocumentSourceInternalUnpackBucket::kStageNameInternal diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index 17537069d51..35f174be2a4 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -55,11 +55,10 @@ struct StageConstraints { enum class PositionRequirement { kNone, kFirst, - // User can specify this stage anywhere, as long as the system can move the stage to be - // first. If pipeline optimization is disabled, then the stage must be first prior to - // optimization. - kFirstAfterOptimization, - kLast + kLast, + // Stages with 'kCustom' requirement must also implement the 'validatePipelinePosition()' + // method which is called during pipeline validation. + kCustom }; /** @@ -343,6 +342,11 @@ struct StageConstraints { // Indicates that a stage is allowed within a pipeline-stlye update. bool isAllowedWithinUpdatePipeline = false; + // If true, then this stage may only appear in the pipeline once, though it can appear at an + // arbitrary position. It is not necessary to consider this for stages which have a strict + // PositionRequirement, since the presence of a second stage will violate that constraint. + bool canAppearOnlyOnceInPipeline = false; + // Indicates that a stage does not modify anything to do with a sort and can be done before a // following merge sort. bool preservesOrderAndMetadata = false; @@ -358,6 +362,8 @@ struct StageConstraints { isIndependentOfAnyCollection == other.isIndependentOfAnyCollection && canSwapWithMatch == other.canSwapWithMatch && canSwapWithSkippingOrLimitingStage == other.canSwapWithSkippingOrLimitingStage && + canSwapWithSingleDocTransform == other.canSwapWithSingleDocTransform && + canAppearOnlyOnceInPipeline == other.canAppearOnlyOnceInPipeline && isAllowedWithinUpdatePipeline == other.isAllowedWithinUpdatePipeline && unionRequirement == other.unionRequirement && preservesOrderAndMetadata == other.preservesOrderAndMetadata; diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 5537e4effb3..e4cea383b32 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -415,6 +415,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } pipeline->optimizePipeline(); + + // Validate the pipeline post-optimization. + const bool alreadyOptimized = true; + pipeline->validateCommon(alreadyOptimized); + return pipeline; }; |