summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRomans Kasperovics <romans.kasperovics@mongodb.com>2023-02-10 01:32:07 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-10 05:10:40 +0000
commit1de0264c6f0d48a21339e6a6a6a54b6b1e916f85 (patch)
treef10039e6b4e7a2278e6704b5450609746ce079e4
parent5300decfd223ba6cf5766faafe0603ee1ca6961d (diff)
downloadmongo-1de0264c6f0d48a21339e6a6a6a54b6b1e916f85.tar.gz
SERVER-71839 Add appear-once and custom position stage constraints
-rw-r--r--jstests/aggregation/sources/multiple_unpack_bucket_error.js28
-rw-r--r--src/mongo/db/pipeline/document_source.h10
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h13
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h2
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.cpp16
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h3
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp34
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp2
-rw-r--r--src/mongo/db/pipeline/stage_constraints.h16
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp5
10 files changed, 83 insertions, 46 deletions
diff --git a/jstests/aggregation/sources/multiple_unpack_bucket_error.js b/jstests/aggregation/sources/multiple_unpack_bucket_error.js
index 3b5f20bf631..b00c6265f9b 100644
--- a/jstests/aggregation/sources/multiple_unpack_bucket_error.js
+++ b/jstests/aggregation/sources/multiple_unpack_bucket_error.js
@@ -13,36 +13,48 @@ coll.drop();
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [
- {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}},
- {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}}
+ {
+ $_internalUnpackBucket:
+ {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)}
+ },
+ {
+ $_internalUnpackBucket:
+ {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)}
+ }
],
cursor: {}
}),
- 5348304);
+ 7183900);
// $_unpackBucket is an alias of $_internalUnpackBucket, the same restriction should apply.
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [
{$_unpackBucket: {timeField: 'time'}},
- {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}}
+ {
+ $_internalUnpackBucket:
+ {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)}
+ }
],
cursor: {}
}),
- 5348304);
+ 7183900);
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [
- {$_internalUnpackBucket: {exclude: [], timeField: 'time', bucketMaxSpanSeconds: 3600}},
+ {
+ $_internalUnpackBucket:
+ {exclude: [], timeField: 'time', bucketMaxSpanSeconds: NumberInt(3600)}
+ },
{$_unpackBucket: {timeField: 'time'}}
],
cursor: {}
}),
- 5348304);
+ 7183900);
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [{$_unpackBucket: {timeField: 'time'}}, {$_unpackBucket: {timeField: 'time'}}],
cursor: {}
}),
- 5348304);
+ 7183900);
})();
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 4618d913e81..f16d2cee632 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -383,6 +383,16 @@ public:
Pipeline::SplitState = Pipeline::SplitState::kUnsplit) const = 0;
/**
+ * If a stage's StageConstraints::PositionRequirement is kCustom, then it should also override
+ * this method, which will be called by the validation process.
+ */
+ virtual void validatePipelinePosition(bool alreadyOptimized,
+ Pipeline::SourceContainer::const_iterator pos,
+ const Pipeline::SourceContainer& container) const {
+ MONGO_UNIMPLEMENTED_TASSERT(7183905);
+ };
+
+ /**
* Informs the stage that it is no longer needed and can release its resources. After dispose()
* is called the stage must still be able to handle calls to getNext(), but can return kEOF.
*
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index c3d2eed51fd..c2a2b838b81 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -51,7 +51,7 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
return {StreamType::kStreaming,
- PositionRequirement::kFirstAfterOptimization,
+ PositionRequirement::kCustom,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
@@ -60,6 +60,17 @@ public:
UnionRequirement::kAllowed};
}
+ void validatePipelinePosition(bool alreadyOptimized,
+ Pipeline::SourceContainer::const_iterator pos,
+ const Pipeline::SourceContainer& container) const final {
+ // This stage must be in the first position in the pipeline after optimization.
+ uassert(40603,
+ str::stream() << getSourceName()
+ << " was not the first stage in the pipeline after optimization. Is "
+ "optimization disabled or inhibited?",
+ !alreadyOptimized || pos == container.cbegin());
+ }
+
/**
* DocumentSourceGeoNear should always be replaced by a DocumentSourceGeoNearCursor before
* executing a pipeline, so this method should never be called.
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index 7ec99287ebc..7d3be11e85e 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -105,6 +105,8 @@ public:
UnionRequirement::kAllowed,
ChangeStreamRequirement::kDenylist};
constraints.canSwapWithMatch = true;
+ // The user cannot specify multiple $unpackBucket stages in the pipeline.
+ constraints.canAppearOnlyOnceInPipeline = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
index b7ca9c6310e..f5760429206 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
@@ -157,7 +157,6 @@ void LiteParsedPipeline::tickGlobalStageCounters() const {
void LiteParsedPipeline::validate(const OperationContext* opCtx,
bool performApiVersionChecks) const {
- int internalUnpackBucketCount = 0;
for (auto&& stage : _stageSpecs) {
const auto& stageName = stage->getParseTimeName();
const auto& stageInfo = LiteParsedDocumentSource::getInfo(stageName);
@@ -179,25 +178,10 @@ void LiteParsedPipeline::validate(const OperationContext* opCtx,
sometimesCallback);
}
- internalUnpackBucketCount +=
- (DocumentSourceInternalUnpackBucket::kStageNameInternal == stageName ||
- DocumentSourceInternalUnpackBucket::kStageNameExternal == stageName)
- ? 1
- : 0;
-
for (auto&& subPipeline : stage->getSubPipelines()) {
subPipeline.validate(opCtx, performApiVersionChecks);
}
}
-
-
- // Validates that the pipeline contains at most one $_internalUnpackBucket or $_unpackBucket
- // stage.
- uassert(5348304,
- str::stream() << "Encountered pipeline with more than one "
- << DocumentSourceInternalUnpackBucket::kStageNameInternal << " or "
- << DocumentSourceInternalUnpackBucket::kStageNameExternal << " stage",
- internalUnpackBucketCount <= 1);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index d02ff8ee70e..edd3716cf86 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -214,8 +214,7 @@ public:
/**
* Verifies that the pipeline contains valid stages. Optionally calls
- * 'validatePipelineStagesforAPIVersion' with 'opCtx', and throws UserException if there is
- * more than one $_internalUnpackBucket stage in the pipeline.
+ * 'validatePipelineStagesforAPIVersion' with 'opCtx'.
*/
void validate(const OperationContext* opCtx, bool performApiVersionChecks = true) const;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index d6851e46df3..c924eaa5592 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -234,39 +234,41 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::create(
}
void Pipeline::validateCommon(bool alreadyOptimized) const {
- size_t i = 0;
-
uassert(ErrorCodes::FailedToParse,
str::stream() << "Pipeline length must be no longer than "
<< internalPipelineLengthLimit << " stages",
static_cast<int>(_sources.size()) <= internalPipelineLengthLimit);
- for (auto&& stage : _sources) {
+ // Keep track of stages which can only appear once.
+ std::set<StringData> singleUseStages;
+
+ for (auto sourceIter = _sources.begin(); sourceIter != _sources.end(); ++sourceIter) {
+ auto& stage = *sourceIter;
auto constraints = stage->constraints(_splitState);
// Verify that all stages adhere to their PositionRequirement constraints.
uassert(40602,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline",
- !(constraints.requiredPosition == PositionRequirement::kFirst && i != 0));
- uassert(40603,
- str::stream() << stage->getSourceName()
- << " is only valid as the first stage in an optimized pipeline",
- !(alreadyOptimized &&
- constraints.requiredPosition == PositionRequirement::kFirstAfterOptimization &&
- i != 0));
+ !(constraints.requiredPosition == PositionRequirement::kFirst &&
+ sourceIter != _sources.begin()));
+ // TODO SERVER-73790: use PositionRequirement::kCustom to validate $match.
auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
uassert(17313,
"$match with $text is only allowed as the first pipeline stage",
- !(i != 0 && matchStage && matchStage->isTextQuery()));
+ !(sourceIter != _sources.begin() && matchStage && matchStage->isTextQuery()));
uassert(40601,
str::stream() << stage->getSourceName()
<< " can only be the final stage in the pipeline",
!(constraints.requiredPosition == PositionRequirement::kLast &&
- i != _sources.size() - 1));
- ++i;
+ std::next(sourceIter) != _sources.end()));
+
+ // If the stage has a special requirement about its position, validate it.
+ if (constraints.requiredPosition == PositionRequirement::kCustom) {
+ stage->validatePipelinePosition(alreadyOptimized, sourceIter, _sources);
+ }
// Verify that we are not attempting to run a mongoS-only stage on mongoD.
uassert(40644,
@@ -278,6 +280,12 @@ void Pipeline::validateCommon(bool alreadyOptimized) const {
str::stream() << "Stage not supported inside of a multi-document transaction: "
<< stage->getSourceName(),
!(pCtx->opCtx->inMultiDocumentTransaction() && !constraints.isAllowedInTransaction()));
+
+ // Verify that a stage which can only appear once doesn't appear more than that.
+ uassert(7183900,
+ str::stream() << stage->getSourceName() << " can only be used once in the pipeline",
+ !(constraints.canAppearOnlyOnceInPipeline &&
+ !singleUseStages.insert(stage->getSourceName()).second));
}
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f0d4d224f42..fd6b1f14589 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -1399,7 +1399,7 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll
dynamic_cast<const DocumentSourceInternalUnpackBucket*>(
iter->get())) {
unpackIter = iter;
- uassert(6505001,
+ tassert(6505001,
str::stream()
<< "Expected at most one "
<< DocumentSourceInternalUnpackBucket::kStageNameInternal
diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h
index 17537069d51..35f174be2a4 100644
--- a/src/mongo/db/pipeline/stage_constraints.h
+++ b/src/mongo/db/pipeline/stage_constraints.h
@@ -55,11 +55,10 @@ struct StageConstraints {
enum class PositionRequirement {
kNone,
kFirst,
- // User can specify this stage anywhere, as long as the system can move the stage to be
- // first. If pipeline optimization is disabled, then the stage must be first prior to
- // optimization.
- kFirstAfterOptimization,
- kLast
+ kLast,
+ // Stages with 'kCustom' requirement must also implement the 'validatePipelinePosition()'
+ // method which is called during pipeline validation.
+ kCustom
};
/**
@@ -343,6 +342,11 @@ struct StageConstraints {
// Indicates that a stage is allowed within a pipeline-stlye update.
bool isAllowedWithinUpdatePipeline = false;
+ // If true, then this stage may only appear in the pipeline once, though it can appear at an
+ // arbitrary position. It is not necessary to consider this for stages which have a strict
+ // PositionRequirement, since the presence of a second stage will violate that constraint.
+ bool canAppearOnlyOnceInPipeline = false;
+
// Indicates that a stage does not modify anything to do with a sort and can be done before a
// following merge sort.
bool preservesOrderAndMetadata = false;
@@ -358,6 +362,8 @@ struct StageConstraints {
isIndependentOfAnyCollection == other.isIndependentOfAnyCollection &&
canSwapWithMatch == other.canSwapWithMatch &&
canSwapWithSkippingOrLimitingStage == other.canSwapWithSkippingOrLimitingStage &&
+ canSwapWithSingleDocTransform == other.canSwapWithSingleDocTransform &&
+ canAppearOnlyOnceInPipeline == other.canAppearOnlyOnceInPipeline &&
isAllowedWithinUpdatePipeline == other.isAllowedWithinUpdatePipeline &&
unionRequirement == other.unionRequirement &&
preservesOrderAndMetadata == other.preservesOrderAndMetadata;
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 5537e4effb3..e4cea383b32 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -415,6 +415,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
pipeline->optimizePipeline();
+
+ // Validate the pipeline post-optimization.
+ const bool alreadyOptimized = true;
+ pipeline->validateCommon(alreadyOptimized);
+
return pipeline;
};