diff options
47 files changed, 210 insertions, 39 deletions
diff --git a/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js b/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js new file mode 100644 index 00000000000..8877aa99740 --- /dev/null +++ b/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js @@ -0,0 +1,75 @@ +/** + * Tests for invalid usages of $unionWith, or invalid stages within the $unionWith sub-pipeline. + */ +(function() { +"use strict"; + +load("jstests/libs/fixture_helpers.js"); // For isReplSet() and isSharded(). + +const baseColl = db["base"]; +baseColl.drop(); +const unionColl = db["union"]; +unionColl.drop(); + +// Ensure the base collection exists. +assert.commandWorked(baseColl.insert({a: 1})); + +// Disallowed within an update pipeline. +assert.commandFailedWithCode(baseColl.update({a: 1}, [{$unionWith: unionColl.getName()}]), + ErrorCodes.InvalidOptions); + +function assertFailsWithCode(pipeline, errCode) { + assert.commandFailedWithCode(db.runCommand({ + aggregate: baseColl.getName(), + pipeline: pipeline, + cursor: {}, + }), + errCode); +} + +// Change streams are only supported against a replica set. +if (FixtureHelpers.isReplSet(db) || FixtureHelpers.isSharded(baseColl)) { + // Disallowed alongside a $changeStream. + assertFailsWithCode([{$changeStream: {}}, {$unionWith: unionColl.getName()}], + ErrorCodes.IllegalOperation); + + // Likewise, $changeStream is disallowed within a $unionWith sub-pipeline. + assertFailsWithCode( + [{$unionWith: {coll: unionColl.getName(), pipeline: [{$changeStream: {}}]}}], 31441); + + assert.commandFailedWithCode(db.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {}}, {$unionWith: unionColl.getName()}], + cursor: {}, + }), + ErrorCodes.IllegalOperation); +} + +// $unionWith sub-pipeline cannot contain stages which write data ($merge, $out). +let subPipe = [{$out: "some_out_coll"}]; +assertFailsWithCode([{$unionWith: {coll: unionColl.getName(), pipeline: subPipe}}], 31441); + +subPipe = [{ + $merge: { + into: {db: db.getName(), coll: "some_merge_coll"}, + whenMatched: "replace", + whenNotMatched: "fail" + } +}]; +assertFailsWithCode([{$unionWith: {coll: unionColl.getName(), pipeline: subPipe}}], 31441); + +// Test that collection-less stages are not allowed within the $unionWith sub-pipeline. +subPipe = [{$listCachedAndActiveUsers: {}}]; +assertFailsWithCode([{$unionWith: {coll: unionColl.getName(), pipeline: subPipe}}], + ErrorCodes.InvalidNamespace); + +subPipe = [{$listLocalSessions: {}}]; +assertFailsWithCode([{$unionWith: {coll: unionColl.getName(), pipeline: subPipe}}], + ErrorCodes.InvalidNamespace); + +if (FixtureHelpers.isSharded(baseColl)) { + subPipe = [{$currentOp: {localOps: true}}]; + assertFailsWithCode([{$unionWith: {coll: unionColl.getName(), pipeline: subPipe}}], + ErrorCodes.InvalidNamespace); +} +})(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 148d8ea59b1..5f956431c36 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -142,6 +142,7 @@ public: using StreamType = StageConstraints::StreamType; using TransactionRequirement = StageConstraints::TransactionRequirement; using LookupRequirement = StageConstraints::LookupRequirement; + using UnionRequirement = StageConstraints::UnionRequirement; /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 65e2cd960a2..8804b0df6c1 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -57,7 +57,8 @@ public: DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 1049a9b04aa..153bc685a70 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -121,6 +121,7 @@ StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipe FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); constraints.isIndependentOfAnyCollection = pExpCtx->ns.isCollectionlessAggregateNS() ? true : false; diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index 8c9283f4400..5713af11d58 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -62,6 +62,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 9d86e364189..20ceab62bd3 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -129,6 +129,7 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints( FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); // This transformation could be part of a 'collectionless' change stream on an entire diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 1bc801caed8..a60fb991ead 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -55,6 +55,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index a3b766ff0cd..0e50658f89d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -71,6 +71,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } @@ -129,6 +130,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 5c20fdbc2dc..60401817729 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -83,7 +83,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index eebd9e639de..37abba0e53a 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -114,7 +114,10 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + (_showLocalOpsOnMongoS == LocalOpsMode::kLocalMongosOps + ? UnionRequirement::kNotAllowed + : UnionRequirement::kAllowed)); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 3db747e66db..cb8cbd183f0 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -59,7 +59,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index f77e5d14bde..ab69ed3b010 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -211,7 +211,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kNotAllowed}; + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index ed438ecf15e..88f926b9b41 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -117,6 +117,18 @@ StageConstraints::LookupRequirement computeLookupRequirement( return StageConstraints::LookupRequirement::kAllowed; } +StageConstraints::UnionRequirement computeUnionRequirement( + const std::vector<DocumentSourceFacet::FacetPipeline>& facets) { + for (auto&& facet : facets) { + for (auto&& src : facet.pipeline->getSources()) { + if (!src->constraints().isAllowedInUnionPipeline()) { + return StageConstraints::UnionRequirement::kNotAllowed; + } + } + } + return StageConstraints::UnionRequirement::kAllowed; +} + } // namespace std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed::parse( @@ -253,7 +265,8 @@ StageConstraints DocumentSourceFacet::constraints(Pipeline::SplitState) const { std::get<StageConstraints::DiskUseRequirement>(diskAndTxnReq), FacetRequirement::kNotAllowed, std::get<StageConstraints::TransactionRequirement>(diskAndTxnReq), - computeLookupRequirement(_facets)}; + computeLookupRequirement(_facets), + computeUnionRequirement(_facets)}; } bool DocumentSourceFacet::usedDisk() { diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 13b00decc38..98bb537d421 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -241,7 +241,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } DocumentSource::GetNextResult doGetNext() final { @@ -282,6 +283,7 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kAllowed, }; } @@ -738,7 +740,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { @@ -806,7 +809,8 @@ public: DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourcePrimaryShardTmpDataNoTxn> create() { @@ -826,7 +830,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kNotAllowed}; + LookupRequirement::kNotAllowed, + UnionRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceBannedInLookup> create() { diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index ae72b3fc027..a3e1be1776a 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -56,7 +56,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 85afcb9cf69..979c1dde866 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -97,7 +97,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index cff7787c07e..88b0a669b6e 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -125,7 +125,8 @@ public: DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 2d262232407..f43bff49df6 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -79,7 +79,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index 2c13583a78a..cf23b7735b6 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -60,7 +60,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { diff --git a/src/mongo/db/pipeline/document_source_internal_shard_filter.h b/src/mongo/db/pipeline/document_source_internal_shard_filter.h index 2a13791b793..803ff1be41a 100644 --- a/src/mongo/db/pipeline/document_source_internal_shard_filter.h +++ b/src/mongo/db/pipeline/document_source_internal_shard_filter.h @@ -60,6 +60,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kBlacklist); } diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index 5e1580ce4e0..6ac88f8a82d 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -71,7 +71,8 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kAllowed, _mergeType == HostTypeRequirement::kMongoS ? LookupRequirement::kNotAllowed - : LookupRequirement::kAllowed}; + : LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } private: diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 8bdd83686fe..6fe5370b0c2 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -56,7 +56,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } const char* getSourceName() const final { diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index f49b9859fb5..40af292031d 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -94,7 +94,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kNotAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 426ab739c20..dbe545c3096 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -109,7 +109,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kNotAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index fa280ad09f7..86ee7c543fe 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -93,7 +93,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 701bf51ba3f..1c5d8c49319 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -217,7 +217,8 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { diskRequirement, FacetRequirement::kAllowed, txnRequirement, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.canSwapWithMatch = true; constraints.canSwapWithLimitAndSample = !_unwindSrc; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 40d50586025..91507a9ba83 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -71,6 +71,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); constraints.canSwapWithMatch = true; diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index c35cdfcaadf..a684c4c82f8 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -90,6 +90,7 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kAllowed, LookupRequirement::kAllowed, + UnionRequirement::kAllowed, ChangeStreamRequirement::kWhitelist}; } diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index fb93585112c..808836e0d77 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -125,7 +125,8 @@ public: DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kNotAllowed}; + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final override { diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index bfbd4ebd2fd..a8466f51fc4 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -96,7 +96,8 @@ public: DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kNotAllowed}; + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed}; } Value serialize( diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index f091a24f1b3..d706f4f3ba4 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -91,7 +91,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index e6a1fe5e207..4c91abdf442 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -68,7 +68,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index a4409c2e081..a45e2af8c9f 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -48,6 +48,7 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kAllowed, LookupRequirement::kAllowed, + UnionRequirement::kAllowed, ChangeStreamRequirement::kWhitelist}; } diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 14c49978ee8..0d10552ca4c 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -50,7 +50,8 @@ public: DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } DepsTracker::State getDependencies(DepsTracker* deps) const final { diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index d65880bc806..65521488158 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -52,7 +52,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index 15381735974..355a7d3437f 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -58,7 +58,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.requiresInputDocSource = (_cache->isBuilding()); return constraints; diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 8ef435bd776..323c628f60e 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -63,6 +63,7 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kAllowed, LookupRequirement::kAllowed, + UnionRequirement::kAllowed, ChangeStreamRequirement::kWhitelist); constraints.canSwapWithMatch = true; constraints.canSwapWithLimitAndSample = true; diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 46ab8f46041..89dbeb95b11 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -58,7 +58,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } const char* getSourceName() const final { diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 69ff2b8abb1..1db93f7b5ab 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -77,6 +77,7 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kAllowed, LookupRequirement::kAllowed, + UnionRequirement::kAllowed, ChangeStreamRequirement::kBlacklist); // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index ef4b5225d0f..0e7d6d5a96c 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -62,7 +62,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h index 89a91501d22..d0229416c71 100644 --- a/src/mongo/db/pipeline/document_source_test_optimizations.h +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -56,7 +56,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } virtual boost::optional<DistributedPlanLogic> distributedPlanLogic() override { diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index 0bf34dbf376..ba0327a4be2 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -58,7 +58,19 @@ public: DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) - : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {} + : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) { + if (_pipeline) { + const auto& sources = _pipeline->getSources(); + auto it = std::find_if(sources.begin(), sources.end(), [](const auto& src) { + return !src->constraints().isAllowedInUnionPipeline(); + }); + + uassert(31441, + str::stream() << (*it)->getSourceName() + << " is not allowed within a $unionWith's sub-pipeline", + it == sources.end()); + } + } const char* getSourceName() const final { return kStageName.rawData(); @@ -80,7 +92,8 @@ public: TransactionRequirement::kNotAllowed, // The check to disallow $unionWith on a sharded collection within $lookup happens // outside of the constraints as long as the involved namespaces are reported correctly. - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); } DepsTracker::State getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index c3254179e88..de7269d7639 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -55,7 +55,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index a657912b6a7..5cc39e890bf 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2465,7 +2465,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kNotAllowed}; + LookupRequirement::kNotAllowed, + UnionRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceMustRunOnMongoS> create() { @@ -2650,7 +2651,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed); + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; return constraints; @@ -2746,7 +2748,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kNotAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceDisallowedInTransactions> create() { @@ -2820,7 +2823,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kAllowed}; + LookupRequirement::kAllowed, + UnionRequirement::kAllowed}; } }; diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index da6133f11da..b4fee766281 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -106,6 +106,11 @@ struct StageConstraints { */ enum class LookupRequirement { kNotAllowed, kAllowed }; + /** + * Indicates whether or not this stage may be run as part of a $unionWith pipeline. + */ + enum class UnionRequirement { kNotAllowed, kAllowed }; + using DiskUseAndTransactionRequirement = std::pair<DiskUseRequirement, TransactionRequirement>; /** @@ -151,6 +156,7 @@ struct StageConstraints { FacetRequirement facetRequirement, TransactionRequirement transactionRequirement, LookupRequirement lookupRequirement, + UnionRequirement unionRequirement, ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) : requiredPosition(requiredPosition), hostRequirement(hostRequirement), @@ -159,6 +165,7 @@ struct StageConstraints { facetRequirement(facetRequirement), transactionRequirement(transactionRequirement), lookupRequirement(lookupRequirement), + unionRequirement(unionRequirement), streamType(streamType) { // Stages which are allowed to run in $facet must not have any position requirements. invariant(!(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); @@ -257,6 +264,13 @@ struct StageConstraints { } /** + * Returns true if this stage may be used inside a $unionWith subpipeline. + */ + bool isAllowedInUnionPipeline() const { + return unionRequirement == UnionRequirement::kAllowed; + } + + /** * Returns true if this stage writes persistent data to disk. */ bool writesPersistentData() const { @@ -288,6 +302,9 @@ struct StageConstraints { // Indicates whether this stage is allowed in a $lookup subpipeline. const LookupRequirement lookupRequirement; + // Indicates whether this stage is allowed in a $unionWith subpipeline. + const UnionRequirement unionRequirement; + // Indicates whether this is a streaming or blocking stage. const StreamType streamType; diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index f979c13a00c..1824d948949 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -91,7 +91,8 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, - LookupRequirement::kNotAllowed); + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index ff76d2ce90e..44f96d5501b 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -74,6 +74,7 @@ public: FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } |