diff options
Diffstat (limited to 'src/mongo')
8 files changed, 244 insertions, 16 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 153bc685a70..9af00b5773c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -139,10 +139,6 @@ Value DocumentSourceOplogMatch::serialize(optional<ExplainOptions::Verbosity> ex return Value(); } -DocumentSourceOplogMatch::DocumentSourceOplogMatch(BSONObj filter, - const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceMatch(std::move(filter), expCtx) {} - void DocumentSourceChangeStream::checkValueType(const Value v, const StringData filedName, BSONType expectedType) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 9570f2e16e1..9eeaaf19b9b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -29,12 +29,15 @@ #pragma once +#include <type_traits> + #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/resume_token.h" +#include "mongo/util/intrusive_counter.h" namespace mongo { @@ -210,6 +213,12 @@ private: */ class DocumentSourceOplogMatch final : public DocumentSourceMatch { public: + DocumentSourceOplogMatch(const DocumentSourceOplogMatch& other) : DocumentSourceMatch(other) {} + + virtual boost::intrusive_ptr<DocumentSourceMatch> clone() const { + return make_intrusive<std::decay_t<decltype(*this)>>(*this); + } + static boost::intrusive_ptr<DocumentSourceOplogMatch> create( BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -228,7 +237,7 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; private: - DocumentSourceOplogMatch(BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx); + using DocumentSourceMatch::DocumentSourceMatch; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index 86ee7c543fe..ed8dcdf177d 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -29,10 +29,13 @@ #pragma once +#include <type_traits> + #include "mongo/bson/bsonelement.h" #include "mongo/db/pipeline/document_source_list_local_sessions.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/util/intrusive_counter.h" namespace mongo { @@ -47,6 +50,13 @@ namespace mongo { */ class DocumentSourceListSessions final : public DocumentSourceMatch { public: + DocumentSourceListSessions(const DocumentSourceListSessions& other) + : DocumentSourceMatch(other) {} + + virtual boost::intrusive_ptr<DocumentSourceMatch> clone() const { + return make_intrusive<std::decay_t<decltype(*this)>>(*this); + } + static constexpr StringData kStageName = "$listSessions"_sd; class LiteParsed final : public LiteParsedDocumentSource { diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index a684c4c82f8..ecdda5dc274 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -30,16 +30,24 @@ #pragma once #include <memory> +#include <type_traits> #include <utility> #include "mongo/client/connpool.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/util/intrusive_counter.h" namespace mongo { class DocumentSourceMatch : public DocumentSource { public: + template <typename T, typename... Args, typename> + friend boost::intrusive_ptr<T> make_intrusive(Args&&...); + virtual boost::intrusive_ptr<DocumentSourceMatch> clone() const { + return make_intrusive<std::decay_t<decltype(*this)>>(*this); + } + static constexpr StringData kStageName = "$match"_sd; /** * Convenience method for creating a $match stage. @@ -170,6 +178,11 @@ public: } protected: + DocumentSourceMatch(const DocumentSourceMatch& other) + : DocumentSourceMatch( + other.serialize().getDocument().toBson().firstElement().embeddedObject(), + other.pExpCtx) {} + GetNextResult doGetNext() override; DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr<ExpressionContext>& expCtx); diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 323c628f60e..d983de72021 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -29,6 +29,8 @@ #pragma once +#include <type_traits> + #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/transformer_interface.h" @@ -42,6 +44,12 @@ namespace mongo { */ class DocumentSourceSingleDocumentTransformation final : public DocumentSource { public: + virtual boost::intrusive_ptr<DocumentSource> clone() const { + auto list = DocumentSource::parse(pExpCtx, serialize().getDocument().toBson()); + invariant(list.size() == 1); + return list.front(); + } + DocumentSourceSingleDocumentTransformation( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index a6a31657dbf..00f531bbf8d 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -32,15 +32,18 @@ #include <iterator> +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_union_with.h" #include "mongo/db/pipeline/document_source_union_with_gen.h" #include "mongo/util/log.h" namespace mongo { -REGISTER_TEST_DOCUMENT_SOURCE(unionWith, - DocumentSourceUnionWith::LiteParsed::parse, - DocumentSourceUnionWith::createFromBson); +REGISTER_DOCUMENT_SOURCE(unionWith, + DocumentSourceUnionWith::LiteParsed::parse, + DocumentSourceUnionWith::createFromBson); namespace { std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( @@ -116,6 +119,9 @@ PrivilegeVector DocumentSourceUnionWith::LiteParsed::requiredPrivileges( boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(ErrorCodes::NotImplemented, + "$unionWith can only be used if test commands are enabled", + getTestCommandsEnabled()); uassert(ErrorCodes::FailedToParse, str::stream() << "the $unionWith stage specification must be an object or string, but found " @@ -163,12 +169,23 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { return GetNextResult::makeEOF(); } -DocumentSource::GetModPathsReturn DocumentSourceUnionWith::getModifiedPaths() const { - // Since we might have a document arrive from the foreign pipeline with the same path as a - // document in the main pipeline. Without introspecting the sub-pipeline, we must report that - // all paths have been modified. - return {GetModPathsReturn::Type::kAllPaths, {}, {}}; -} +Pipeline::SourceContainer::iterator DocumentSourceUnionWith::doOptimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + auto duplicateAcrossUnion = [&](auto&& nextStage) { + _pipeline->addFinalSource(nextStage->clone()); + auto newStageItr = container->insert(itr, std::move(nextStage)); + container->erase(std::next(itr)); + return newStageItr == container->begin() ? newStageItr : std::prev(newStageItr); + }; + if (std::next(itr) != container->end()) { + if (auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get())) + return duplicateAcrossUnion(nextMatch); + else if (auto nextProject = dynamic_cast<DocumentSourceSingleDocumentTransformation*>( + (*std::next(itr)).get())) + return duplicateAcrossUnion(nextProject); + } + return std::next(itr); +}; void DocumentSourceUnionWith::doDispose() { if (_pipeline) { diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index ba0327a4be2..beb69c7e9e5 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -80,10 +80,15 @@ public: std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - GetModPathsReturn getModifiedPaths() const final; + GetModPathsReturn getModifiedPaths() const final { + // Since we might have a document arrive from the foreign pipeline with the same path as a + // document in the main pipeline. Without introspecting the sub-pipeline, we must report + // that all paths have been modified. + return {GetModPathsReturn::Type::kAllPaths, {}, {}}; + } StageConstraints constraints(Pipeline::SplitState) const final { - return StageConstraints( + auto constraints = StageConstraints( StreamType::kStreaming, PositionRequirement::kNone, HostTypeRequirement::kAnyShard, @@ -94,6 +99,13 @@ public: // outside of the constraints as long as the involved namespaces are reported correctly. LookupRequirement::kAllowed, UnionRequirement::kAllowed); + + // DocumentSourceUnionWith cannot directly swap with match but it contains custom logic in + // the doOptimizeAt() member function to allow itself to duplicate any match ahead in the + // current pipeline and place one copy inside its sub-pipeline and one copy behind in the + // current pipeline. + constraints.canSwapWithMatch = false; + return constraints; } DepsTracker::State getDependencies(DepsTracker* deps) const final; @@ -111,6 +123,15 @@ public: protected: GetNextResult doGetNext() final; + + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + + boost::intrusive_ptr<DocumentSource> optimize() final { + _pipeline->optimizePipeline(); + return this; + } + void doDispose() final; private: diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 5cc39e890bf..06e3c06923f 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1853,6 +1853,160 @@ TEST(PipelineOptimizationTest, SortSkipProjSkipLimSkipLimBecomesTopKSortSkipProj assertPipelineOptimizesAndSerializesTo(inputPipe, outputPipe, serializedPipe); } +TEST(PipelineOptimizationTest, MatchGetsPushedIntoBothChildrenOfUnion) { + setTestCommandsEnabled(true); // TODO SERVER-45712 remove this line. + assertPipelineOptimizesTo( + "[" + " {$unionWith: 'coll'}," + " {$match: {x: {$eq: 2}}}" + "]", + "[{$match: {x: {$eq: 2}}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [{$match: {x: {$eq: 2}}}]" + " }}]"); + + // Test that the $match can get pulled forward through other stages. + assertPipelineOptimizesAndSerializesTo( + "[" + " {$unionWith: 'coll'}," + " {$lookup: {from: 'lookupColl', as: 'y', localField: 'z', foreignField: 'z'}}," + " {$sort: {score: 1}}," + " {$match: {x: {$eq: 2}}}" + "]", + "[" + " {$match: {x: {$eq: 2}}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [{$match: {x: {$eq: 2}}}]" + " }}," + " {$lookup: {from: 'lookupColl', as: 'y', localField: 'z', foreignField: 'z'}}," + " {$sort: {sortKey: {score: 1}}}" + "]", + "[" + " {$match: {x: {$eq: 2}}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [{$match: {x: {$eq: 2}}}]" + " }}," + " {$lookup: {from: 'lookupColl', as: 'y', localField: 'z', foreignField: 'z'}}," + " {$sort: {score: 1}}" + "]"); + + // Test that the $match can get pulled forward from after the $unionWith to inside, then to the + // beginning of a $unionWith subpipeline. + // TODO: SERVER-45535 the explained inner pipeline should have the 'sortKey' form for $sort. + assertPipelineOptimizesAndSerializesTo( + "[" + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [" + " {$project: {y: false}}," + " {$sort: {score: 1}}" + " ]" + " }}," + " {$match: {x: {$eq: 2}}}" + "]", + "[" + " {$match: {x: {$eq: 2}}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [" + " {$match: {x: {$eq: 2}}}," + " {$project: {y: false}}," + " {$sort: {score: 1}}" + " ]" + " }}" + "]", + "[" + " {$match: {x: {$eq: 2}}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [" + " {$match: {x: {$eq: 2}}}," + " {$project: {y: false}}," + " {$sort: {score: 1}}" + " ]" + " }}" + "]"); +} + +TEST(PipelineOptimizationTest, ProjectGetsPushedIntoBothChildrenOfUnion) { + setTestCommandsEnabled(true); // TODO SERVER-45712 remove this line. + assertPipelineOptimizesTo( + "[" + " {$unionWith: 'coll'}," + " {$project: {x: false}}" + "]", + "[{$project: {x: false}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [{$project: {x: false}}]" + " }}]"); + + // Test an inclusion projection. + assertPipelineOptimizesTo( + "[" + " {$unionWith: 'coll'}," + " {$project: {x: true}}" + "]", + "[{$project: {_id: true, x: true}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [{$project: {_id: true, x: true}}]" + " }}]"); + + // Test a $set. + assertPipelineOptimizesTo( + "[" + " {$unionWith: 'coll'}," + " {$set: {x: 'new value'}}" + "]", + "[{$set: {x: {$const: 'new value'}}}," + " {$unionWith: {" + " coll: 'coll'," + " pipeline: [{$set: {x: {$const: 'new value'}}}]" + " }}]"); +} + +TEST(PipelineOptimizationTest, UnionWithViewsSampleUseCase) { + setTestCommandsEnabled(true); // TODO SERVER-45712 remove this line. + // Test that if someone uses $unionWith to query one logical collection from four physical + // collections then the query and projection can get pushed down to next to each collection + // access. + assertPipelineOptimizesTo( + "[" + " {$unionWith: 'Q2'}," + " {$unionWith: 'Q3'}," + " {$unionWith: 'Q4'}," + " {$match: {business: {$eq: 'good'}}}," + " {$project: {_id: true, x: true}}" + "]", + "[{$match: {business: {$eq: 'good'}}}," + " {$project: {_id: true, x: true}}," + " {$unionWith: {" + " coll: 'Q2'," + " pipeline: [" + " {$match: {business: {$eq: 'good'}}}," + " {$project: {_id: true, x: true}}" + " ]" + " }}," + " {$unionWith: {" + " coll: 'Q3'," + " pipeline: [" + " {$match: {business: {$eq: 'good'}}}," + " {$project: {_id: true, x: true}}" + " ]" + " }}," + " {$unionWith: {" + " coll: 'Q4'," + " pipeline: [" + " {$match: {business: {$eq: 'good'}}}," + " {$project: {_id: true, x: true}}" + " ]" + " }}" + "]"); +} } // namespace Local namespace Sharded { |