diff options
author | Ian Boros <puppyofkosh@gmail.com> | 2019-05-13 18:19:38 -0400 |
---|---|---|
committer | Ian Boros <puppyofkosh@gmail.com> | 2019-05-20 15:50:22 -0400 |
commit | 31967340abb31476910730163c04782f2e915d01 (patch) | |
tree | e21db441abdfbdf707f66f640796e3d8ee6ab6a8 /src | |
parent | 24760ca934a3c7843731de117839070ddf7fd3cc (diff) | |
download | mongo-31967340abb31476910730163c04782f2e915d01.tar.gz |
SERVER-40949 add LookupAllowed stage constraint
This bans $merge and $sB from $lookup subpipelines
Diffstat (limited to 'src')
47 files changed, 227 insertions, 54 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 24e70b27ada..85001ac9c5b 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -223,6 +223,7 @@ env.CppUnitTest( 'document_source_geo_near_test.cpp', 'document_source_graph_lookup_test.cpp', 'document_source_group_test.cpp', + 'document_source_internal_split_pipeline_test.cpp', 'document_source_limit_test.cpp', 'document_source_lookup_change_post_image_test.cpp', 'document_source_lookup_test.cpp', diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index f3f67db8e0a..8bc5d419321 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -139,6 +139,7 @@ public: using FacetRequirement = StageConstraints::FacetRequirement; using StreamType = StageConstraints::StreamType; using TransactionRequirement = StageConstraints::TransactionRequirement; + using LookupRequirement = StageConstraints::LookupRequirement; /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 961ad0f9865..34b4c7ffc60 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -55,7 +55,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index c20dccfe277..0808068da50 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -120,6 +120,7 @@ StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipe DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); constraints.isIndependentOfAnyCollection = pExpCtx->ns.isCollectionlessAggregateNS() ? true : false; diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index d21a2170157..0e5e6ee87ee 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -61,6 +61,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 3598ad97f51..1220286da9e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -128,6 +128,7 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints( DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); // This transformation could be part of a 'collectionless' change stream on an entire diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 5e5b14f69af..7f5564a4a38 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -54,6 +54,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index c91c9813157..d449595faf7 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -69,6 +69,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } @@ -124,6 +125,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 8cea821a23a..9573da4c514 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -81,7 +81,8 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed); + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 84340c63e71..13d59e8e368 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -117,7 +117,8 @@ public: : HostTypeRequirement::kAnyShard), DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed); + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index d1ca145b886..4baab7d91c7 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -59,7 +59,8 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 4b6b821be82..b3c5c4431c8 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -210,7 +210,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kNotAllowed}; } boost::optional<MergingLogic> mergingLogic() final { diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 3c9b55bb215..6974dfc44f2 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -107,6 +107,20 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele } return rawFacetPipelines; } + +StageConstraints::LookupRequirement computeLookupRequirement( + const std::vector<DocumentSourceFacet::FacetPipeline>& facets) { + for (auto&& facet : facets) { + const auto& sources = facet.pipeline->getSources(); + for (auto&& src : sources) { + if (!src->constraints().isAllowedInLookupPipeline()) { + return StageConstraints::LookupRequirement::kNotAllowed; + } + } + } + return StageConstraints::LookupRequirement::kAllowed; +} + } // namespace std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed::parse( @@ -273,7 +287,8 @@ StageConstraints DocumentSourceFacet::constraints(Pipeline::SplitState) const { host, std::get<StageConstraints::DiskUseRequirement>(diskAndTxnReq), FacetRequirement::kNotAllowed, - std::get<StageConstraints::TransactionRequirement>(diskAndTxnReq)}; + std::get<StageConstraints::TransactionRequirement>(diskAndTxnReq), + computeLookupRequirement(_facets)}; } bool DocumentSourceFacet::usedDisk() { diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 90ee6eb6628..d9cbe0146a3 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -218,7 +218,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } DocumentSource::GetNextResult getNext() final { @@ -251,12 +252,15 @@ TEST_F(DocumentSourceFacetTest, PassthroughFacetDoesntRequireDiskAndIsOKInaTxn) class DocumentSourceWritesPersistentData final : public DocumentSourcePassthrough { public: StageConstraints constraints(Pipeline::SplitState) const final { - return {StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kWritesPersistentData, - FacetRequirement::kAllowed, - TransactionRequirement::kNotAllowed}; + return { + StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesPersistentData, + FacetRequirement::kAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + }; } static boost::intrusive_ptr<DocumentSourceWritesPersistentData> create() { @@ -724,7 +728,8 @@ public: HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { @@ -791,7 +796,8 @@ public: HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourcePrimaryShardTmpDataNoTxn> create() { @@ -799,6 +805,26 @@ public: } }; +/** + * A DocumentSource which cannot be used in a $lookup pipeline. + */ +class DocumentSourceBannedInLookup final : public DocumentSourcePassthrough { +public: + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, + LookupRequirement::kNotAllowed}; + } + + static boost::intrusive_ptr<DocumentSourceBannedInLookup> create() { + return new DocumentSourceBannedInLookup(); + } +}; + TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstraint) { auto ctx = getExpCtx(); @@ -810,9 +836,14 @@ TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstrai auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx)); + auto thirdPassthrough = DocumentSourceBannedInLookup::create(); + auto thirdPipeline = + unittest::assertGet(Pipeline::createFacetPipeline({thirdPassthrough}, ctx)); + std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("first", std::move(firstPipeline)); facets.emplace_back("second", std::move(secondPipeline)); + facets.emplace_back("third", std::move(thirdPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).hostRequirement == @@ -821,6 +852,8 @@ TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstrai StageConstraints::DiskUseRequirement::kWritesTmpData); ASSERT(facetStage->constraints(Pipeline::SplitState::kUnsplit).transactionRequirement == StageConstraints::TransactionRequirement::kNotAllowed); + ASSERT_FALSE( + facetStage->constraints(Pipeline::SplitState::kUnsplit).isAllowedInLookupPipeline()); } } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 749d9fee7cd..9f9e94ef5e2 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -55,7 +55,8 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 6449560875b..7454a998901 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -68,7 +68,10 @@ public: hostRequirement, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + hostRequirement == HostTypeRequirement::kMongoS + ? LookupRequirement::kNotAllowed + : LookupRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index b166d8e8335..f59843a1c16 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -123,7 +123,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 9b162ca4575..3073f4c54bc 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -76,7 +76,8 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed); + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index 209c39308b8..d0901f50998 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -59,7 +59,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } boost::optional<MergingLogic> mergingLogic() final { diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index b2b30952dd2..ef08fd54488 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -69,7 +69,9 @@ public: _mergeType, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + _mergeType == HostTypeRequirement::kMongoS ? LookupRequirement::kNotAllowed + : LookupRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline_test.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline_test.cpp new file mode 100644 index 00000000000..4b52c29e052 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline_test.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/intrusive_ptr.hpp> +#include <string> +#include <vector> + +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_internal_split_pipeline.h" +#include "mongo/db/pipeline/stage_constraints.h" + +namespace mongo { +namespace { +using DocumentSourceInternalSplitPipelineTest = AggregationContextFixture; + +TEST_F(DocumentSourceInternalSplitPipelineTest, NotAllowedInLookupIfMustRunOnMongos) { + auto expCtx = getExpCtx(); + auto split = DocumentSourceInternalSplitPipeline::create( + expCtx, StageConstraints::HostTypeRequirement::kMongoS); + ASSERT_FALSE(split->constraints().isAllowedInLookupPipeline()); + ASSERT(split->constraints().hostRequirement == StageConstraints::HostTypeRequirement::kMongoS); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 0a5ed1428a5..5105f2fa787 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -55,7 +55,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index 45df30ac5d8..08f6defcd84 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -95,7 +95,8 @@ public: HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed); + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 57fcdf97a1e..9c04b35f935 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -109,7 +109,8 @@ public: HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed); + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index 42bfe4faf71..c88c2e63fbc 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -90,7 +90,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 8a3a97097d1..52c3b6b40b4 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -196,7 +196,8 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { hostRequirement, diskRequirement, FacetRequirement::kAllowed, - txnRequirement); + txnRequirement, + LookupRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; @@ -635,17 +636,16 @@ void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() { auto& sources = _resolvedIntrospectionPipeline->getSources(); - // Ensure that the pipeline does not contain a $changeStream stage. This check will be - // performed recursively on all sub-pipelines. - uassert(ErrorCodes::IllegalOperation, - "$changeStream is not allowed within a $lookup's pipeline", - sources.empty() || !sources.front()->constraints().isChangeStreamStage()); + auto it = std::find_if( + sources.begin(), sources.end(), [](const boost::intrusive_ptr<DocumentSource>& src) { + return !src->constraints().isAllowedInLookupPipeline(); + }); - // Ensure that the pipeline does not contain a $out stage. Since $out must be the last stage - // of a pipeline, we only need to check the last DocumentSource. + // For other stages, use a generic error. uassert(51047, - "$out is not allowed within a $lookup's pipeline", - sources.empty() || !sources.back()->constraints().writesPersistentData()); + str::stream() << (*it)->getSourceName() + << " is not allowed within a $lookup's sub-pipeline", + it == sources.end()); } void DocumentSourceLookUp::serializeToArray( diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 504ad359625..8920d207555 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -70,6 +70,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); constraints.canSwapWithMatch = true; diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index fbbf487c424..bc406def88e 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -255,7 +255,7 @@ TEST_F(ReplDocumentSourceLookUpTest, RejectsPipelineWithChangeStreamStage) { .firstElement(), expCtx), AssertionException, - ErrorCodes::IllegalOperation); + 51047); } TEST_F(ReplDocumentSourceLookUpTest, RejectsSubPipelineWithChangeStreamStage) { @@ -273,7 +273,7 @@ TEST_F(ReplDocumentSourceLookUpTest, RejectsSubPipelineWithChangeStreamStage) { .firstElement(), expCtx), AssertionException, - ErrorCodes::IllegalOperation); + 51047); } TEST_F(DocumentSourceLookUpTest, RejectsLocalFieldForeignFieldWhenPipelineIsSpecified) { diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 37b9038c0e5..5fc6d0b9ded 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -90,6 +90,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, ChangeStreamRequirement::kWhitelist}; } diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 037d2f54962..b440afac666 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -163,7 +163,8 @@ public: : HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed}; } boost::optional<MergingLogic> mergingLogic() final override { diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 4eece861727..dc6d501a210 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -56,7 +56,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 1078b6a83e5..674aea38657 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -145,7 +145,8 @@ public: hostTypeRequirement, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed}; } const NamespaceString& getOutputNs() const { diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index 9f5aed005b8..91283cea1ff 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -98,7 +98,8 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed}; constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index a05f9d2e4ff..bced5c25a01 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -47,6 +47,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, ChangeStreamRequirement::kWhitelist}; } diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 3a5c7231d50..7b1d45c1dcc 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -50,7 +50,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } DepsTracker::State getDependencies(DepsTracker* deps) const final { diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 8c9a6e99733..0e24c6f420e 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -51,7 +51,8 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } boost::optional<MergingLogic> mergingLogic() final { diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index fa7d9133a2b..c08be4d311f 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -57,7 +57,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed); constraints.requiresInputDocSource = (_cache->isBuilding()); return constraints; diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 97f1d79ca2a..f486a7ce2d8 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -62,6 +62,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, ChangeStreamRequirement::kWhitelist); constraints.canSwapWithMatch = true; constraints.canSwapWithLimitAndSample = true; diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index d6bc4c88458..3b7b4b4cb0e 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -57,7 +57,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index eeca2371b6e..425698957c3 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -69,6 +69,7 @@ public: DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, ChangeStreamRequirement::kBlacklist); // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index 3c5ae5fc729..8f701a16403 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -60,7 +60,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } boost::optional<MergingLogic> mergingLogic() final { diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h index 6bda5052d81..56542e5e4fa 100644 --- a/src/mongo/db/pipeline/document_source_test_optimizations.h +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -52,7 +52,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed}; } virtual boost::optional<MergingLogic> mergingLogic() override { diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 7948f6fdcd6..0582001e910 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -54,7 +54,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index a386fc5bfb8..4453a0b559e 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2346,7 +2346,8 @@ public: HostTypeRequirement::kMongoS, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kNotAllowed}; } static boost::intrusive_ptr<DocumentSourceMustRunOnMongoS> create() { @@ -2530,7 +2531,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; return constraints; @@ -2625,7 +2627,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kNotAllowed}; + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceDisallowedInTransactions> create() { @@ -2697,7 +2700,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed}; + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed}; } }; diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index 716258b7f68..b5edef5ef5e 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -101,6 +101,11 @@ struct StageConstraints { */ enum class TransactionRequirement { kNotAllowed, kAllowed }; + /** + * Indicates whether or not this stage may be run as part of a $lookup pipeline. + */ + enum class LookupRequirement { kNotAllowed, kAllowed }; + using DiskUseAndTransactionRequirement = std::pair<DiskUseRequirement, TransactionRequirement>; /** @@ -145,6 +150,7 @@ struct StageConstraints { DiskUseRequirement diskRequirement, FacetRequirement facetRequirement, TransactionRequirement transactionRequirement, + LookupRequirement lookupRequirement, ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) : requiredPosition(requiredPosition), hostRequirement(hostRequirement), @@ -152,12 +158,19 @@ struct StageConstraints { changeStreamRequirement(changeStreamRequirement), facetRequirement(facetRequirement), transactionRequirement(transactionRequirement), + lookupRequirement(lookupRequirement), streamType(streamType) { // Stages which are allowed to run in $facet must not have any position requirements. invariant(!(isAllowedInsideFacetStage() && requiredPosition != PositionRequirement::kNone)); - // No change stream stages are permitted to run in a $facet pipeline. + // No change stream stages are permitted to run in a $facet or $lookup pipelines. invariant(!(isChangeStreamStage() && isAllowedInsideFacetStage())); + invariant(!(isChangeStreamStage() && isAllowedInLookupPipeline())); + + // Stages which write persistent data cannot be used in a $lookup pipeline. + invariant(!(isAllowedInLookupPipeline() && writesPersistentData())); + invariant( + !(isAllowedInLookupPipeline() && hostRequirement == HostTypeRequirement::kMongoS)); // Only streaming stages are permitted in $changeStream pipelines. invariant(!(isAllowedInChangeStream() && streamType == StreamType::kBlocking)); @@ -237,6 +250,13 @@ struct StageConstraints { } /** + * Returns true if this stage may be used inside a $lookup subpipeline. + */ + bool isAllowedInLookupPipeline() const { + return lookupRequirement == LookupRequirement::kAllowed; + } + + /** * Returns true if this stage writes persistent data to disk. */ bool writesPersistentData() const { @@ -265,6 +285,9 @@ struct StageConstraints { // aggregate is running inside of a multi-document transaction. const TransactionRequirement transactionRequirement; + // Indicates whether this stage is allowed in a $lookup subpipeline. + const LookupRequirement lookupRequirement; + // Indicates whether this is a streaming or blocking stage. const StreamType streamType; diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index d3273b08123..efc3d707a53 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -88,7 +88,8 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - TransactionRequirement::kAllowed); + TransactionRequirement::kAllowed, + LookupRequirement::kNotAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index 2f77bb0d602..90a68fc230d 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -69,6 +69,7 @@ public: DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } |