From 20c33aa11ae2925d66b2df81e2ddb6813d81400a Mon Sep 17 00:00:00 2001 From: Jacob Evans Date: Wed, 20 Mar 2019 11:42:07 -0400 Subject: SERVER-40312 Create a generic tree for pipeline metatdata --- src/mongo/db/pipeline/SConscript | 9 + src/mongo/db/pipeline/document_source_facet.h | 4 + src/mongo/db/pipeline/document_source_lookup.h | 4 + src/mongo/db/pipeline/pipeline.h | 4 + src/mongo/db/pipeline/pipeline_metadata_tree.h | 279 ++++++++++++++++ .../db/pipeline/pipeline_metadata_tree_test.cpp | 363 +++++++++++++++++++++ 6 files changed, 663 insertions(+) create mode 100644 src/mongo/db/pipeline/pipeline_metadata_tree.h create mode 100644 src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index e9183efb0eb..19c0dce4418 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -615,3 +615,12 @@ env.CppUnitTest( 'document_sources_idl', ], ) + +env.CppUnitTest( + target='pipeline_metadata_tree_test', + source='pipeline_metadata_tree_test.cpp', + LIBDEPS=[ + '$BUILD_DIR/mongo/db/service_context_test_fixture', + 'pipeline', + ], +) diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 8c82ecf7cb1..74281663fb6 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -134,6 +134,10 @@ public: return _facets; } + auto& getFacetPipelines() { + return _facets; + } + // The following are overridden just to forward calls to sub-pipelines. void addInvolvedCollections(stdx::unordered_set* involvedNssSet) const final; void detachFromOperationContext() final; diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index f53dbcee3ad..1718ad5dc63 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -177,6 +177,10 @@ public: return *_resolvedIntrospectionPipeline; } + auto& getResolvedIntrospectionPipeline() { + return *_resolvedIntrospectionPipeline; + } + const Variables& getVariables_forTest() { return _variables; } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 849143dd94e..05e2b77977d 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -278,6 +278,10 @@ public: return _sources; } + SourceContainer& getSources() { + return _sources; + } + /** * Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is * empty. diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree.h b/src/mongo/db/pipeline/pipeline_metadata_tree.h new file mode 100644 index 00000000000..e444ecf2734 --- /dev/null +++ b/src/mongo/db/pipeline/pipeline_metadata_tree.h @@ -0,0 +1,279 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_facet.h" +#include "mongo/db/pipeline/document_source_lookup.h" +#include "mongo/db/pipeline/pipeline.h" + +/** + * A simple representation of an Aggregation Pipeline and functions for building it. + * PipelineMetadataTree has no default contents and reflects only the shape of stages. Each stage + * can be occupied by a a specified type in order to have the tree carry any form of arbitrary + * metadata. A PipelineMetadataTree is intended to be zipped along with another representation of a + * pipeline in order to supplement the other representation's metadata. + */ +namespace mongo::pipeline_metadata_tree { + +/** + * An alternate representation of a stage in an Aggregation Pipeline which contains handles to all + * stages it depends on, forming a tree. Each Stage tracks a specific piece of metadata of type 'T'. + * Since Stage forms a tree rather than a DAG, there are no handles from $facet component pipelines + * to their owning $facet stage but there exist pointers from any $facet stage to its component + * pipelines. + */ +template +struct Stage { + /** + * Construct an individual Stage from its components. + */ + Stage(T&& contents, + std::unique_ptr principalChild, + std::vector&& additionalChildren) + : contents(std::move(contents)), + principalChild(std::move(principalChild)), + additionalChildren(std::move(additionalChildren)) {} + + /** + * Specification of the move constructor intentionally inhibits compiler generation of a copy + * constructor. This is intentional since accidental copies could be deterimental for + * performance. This constructor is correctly formed only if the contents type 'T' also has a + * defined or defaulted move constructor. The correct definition of this constructor is + * essential for invoking 'makeTree'. + */ + Stage(Stage&&) = default; + + /** + * The move assignment operator is subject to the same conditions as the move constructor. + */ + Stage& operator=(Stage&&) = default; + + /** + * A comparison operator is correctly defined if the type 'T' has a defined comparison operator. + * This is optional. + */ + bool operator==(const Stage& other) const { + return contents == other.contents && + (principalChild && other.principalChild ? *principalChild == *other.principalChild + : !principalChild && !other.principalChild) && + additionalChildren == other.additionalChildren; + } + + T contents; + + /** + * The child occuring directly before this stage in the pipeline. This is empty for the first + * Stage in any pipeline or sub-pipeline. + */ + std::unique_ptr principalChild; + + /** + * Additional children are the ends of sub-pipelines that feed into this stage. This vector is + * non-empty only for stages which operate on one or more sub-pipelines such as $facet. + */ + std::vector additionalChildren; +}; + +/** + * Following convention, the nested detail namespace should be treated as private and not accessed + * directly. + */ +namespace detail { +template +std::pair>, std::function> makeTreeWithOffTheEndStage( + std::deque&& initialStageContents, + const Pipeline& pipeline, + const std::function&, const DocumentSource&)>& propagator); + +/** + * Produces additional children to be included in a given Stage if it has sub-pipelines. Included + * are off-the-end contents that would be generated for those sub-pipelines if they had one + * additional Stage. A deque 'initialStageContents' is provided to dequeue from in order to populate + * sub-pipelines that, in a graph model, never source from the current pipeline but only feed into + * it. For example, the initial stage contents are used to seed the contents of $lookup + * sub-pipelines. The current Stage's contents are provided to copy and populate sub-pipelines that + * source from the current pipeline and feed back into it through a successive edge. For example, + * $facet sub-pipelines are populated using a copy of the current Stage's contents. + */ +template +inline auto makeAdditionalChildren( + std::deque&& initialStageContents, + const DocumentSource& source, + const std::function&, const DocumentSource&)>& propagator, + const T& currentContentsToCopyForFacet) { + std::vector> children; + std::vector offTheEndContents; + if (auto lookupSource = dynamic_cast(&source); + lookupSource && lookupSource->wasConstructedWithPipelineSyntax()) { + auto[child, offTheEndReshaper] = + makeTreeWithOffTheEndStage(std::move(initialStageContents), + lookupSource->getResolvedIntrospectionPipeline(), + propagator); + offTheEndContents.push_back(offTheEndReshaper(child.get().contents)); + children.push_back(std::move(*child)); + } + if (auto facetSource = dynamic_cast(&source)) + std::transform(facetSource->getFacetPipelines().begin(), + facetSource->getFacetPipelines().end(), + std::back_inserter(children), + [&](const auto& fPipe) { + initialStageContents.push_front(currentContentsToCopyForFacet); + auto[child, offTheEndReshaper] = makeTreeWithOffTheEndStage( + std::move(initialStageContents), *fPipe.pipeline, propagator); + offTheEndContents.push_back(offTheEndReshaper(child.get().contents)); + return std::move(*child); + }); + return std::pair(std::move(children), std::move(offTheEndContents)); +} + +/** + * Produces a stage and returns a function to determine the contents for the next Stage. Given are + * an optional reference to a previous stage which is disengaged at the start of a pipeline or sub- + * pipeline. Also given is 'reshapeContents', a function to produce the content of the current + * stage. The current DocumentSource to build a corresponding Stage for is given through 'source'. + * The front of the 'initialStageContents' deque is used to populate the new Stage if there is no + * previous Stage and is ignored otherwise. + */ +template +inline auto makeStage( + std::deque&& initialStageContents, + boost::optional>&& previous, + const std::function& reshapeContents, + const DocumentSource& source, + const std::function&, const DocumentSource&)>& propagator) { + auto contents = + previous ? reshapeContents(previous.get().contents) : initialStageContents.front(); + if (!previous) + initialStageContents.pop_front(); + + auto[additionalChildren, offTheEndContents] = + makeAdditionalChildren(std::move(initialStageContents), source, propagator, contents); + + auto principalChild = previous ? std::make_unique>(std::move(previous.get())) + : std::unique_ptr>(); + std::function reshaper([&, offTheEndContents{std::move(offTheEndContents)} ]( + const T& reshapable) { return propagator(reshapable, offTheEndContents, source); }); + return std::pair( + boost::optional>( + Stage(std::move(contents), std::move(principalChild), std::move(additionalChildren))), + std::move(reshaper)); +} + +template +inline std::pair>, std::function> makeTreeWithOffTheEndStage( + std::deque&& initialStageContents, + const Pipeline& pipeline, + const std::function&, const DocumentSource&)>& propagator) { + std::pair>, std::function> stageAndReshapeContents; + for (const auto& source : pipeline.getSources()) + stageAndReshapeContents = makeStage(std::move(initialStageContents), + std::move(stageAndReshapeContents.first), + stageAndReshapeContents.second, + *source, + propagator); + return std::move(stageAndReshapeContents); +} + +template +inline void walk(Stage* stage, + Pipeline::SourceContainer::iterator* sourceIter, + const std::function*, DocumentSource*)>& zipper) { + if (stage->principalChild) + walk(stage->principalChild.get(), sourceIter, zipper); + + if (auto lookupSource = dynamic_cast(&***sourceIter); + lookupSource && lookupSource->wasConstructedWithPipelineSyntax()) { + auto iter = lookupSource->getResolvedIntrospectionPipeline().getSources().begin(); + walk(&stage->additionalChildren.front(), &iter, zipper); + } + + if (auto facetSource = dynamic_cast(&***sourceIter)) { + auto facetIter = facetSource->getFacetPipelines().begin(); + for (auto& child : stage->additionalChildren) { + auto iter = facetIter++->pipeline->getSources().begin(); + walk(&child, &iter, zipper); + } + } + + zipper(stage, &**(*sourceIter)++); +} + +} // namespace detail + +/** + * Builds a Stage from a Pipeline. Initial contents for the first pipline stage must be provided. A + * function 'propagator' is neccesary to determine how to build the contents of all further stages. + * A stage will receive the built contents from its directly preceding stage. Initial contents must + * be placed in 'initialStageContents'. Any expressive lookup pipelines require an additional + * initial content element in this queue. + * + * The arguments to propagator will be actualized with the following: + * 'T&' - In general, the contents from the previous stage, initial stages of the main pipeline and + * $lookup pipelines recieve an element off the queue 'initialStageContents'. $facet receives a copy + * of its parent's contents. + * 'std::vector&' - Completed contents from sub-pipelines. $facet's additional children and + * expressive $lookup's final contents will be manifested in here. Note that these will be + * "off-the-end", that is constructed from the final stage of a sub-pipeline and not actually + * contained in that pipeline. This vector is empty for most stages which have only one child. + * 'DocumentSource&' - the current stage of the 'pipeline' a Stage object is being built for. + */ +template +inline Stage makeTree( + std::deque&& initialStageContents, + const Pipeline& pipeline, + const std::function&, const DocumentSource&)>& propagator) { + return *detail::makeTreeWithOffTheEndStage( + std::move(initialStageContents), pipeline, propagator) + .first; +} + +/** + * Walk a PipelineMetadataTree along with a Pipeline. Passes each Stage and its corresponding + * DocumentSource to 'zipper' two-by-two. + */ +template +inline void zip(Stage* tree, + Pipeline* pipeline, + const std::function*, DocumentSource*)>& zipper) { + auto iter = pipeline->getSources().begin(); + detail::walk(tree, &iter, zipper); +} + +} // namespace mongo::AggregatePipelineSchemaTracker diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp new file mode 100644 index 00000000000..46a816ad22e --- /dev/null +++ b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp @@ -0,0 +1,363 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "mongo/base/string_data.h" +#include "mongo/bson/json.h" +#include "mongo/bson/json.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/aggregation_request.h" +#include "mongo/db/pipeline/document_source_bucket_auto.h" +#include "mongo/db/pipeline/document_source_facet.h" +#include "mongo/db/pipeline/document_source_graph_lookup.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_lookup.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_tee_consumer.h" +#include "mongo/db/pipeline/document_source_unwind.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/pipeline_metadata_tree.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" + +#define ASSERT_DOES_NOT_THROW(EXPRESSION) \ + try { \ + EXPRESSION; \ + } catch (const AssertionException& e) { \ + ::mongoutils::str::stream err; \ + err << "Threw an exception incorrectly: " << e.toString() \ + << " Exception occured in: " << #EXPRESSION; \ + ::mongo::unittest::TestAssertionFailure(__FILE__, __LINE__, err).stream(); \ + } + +namespace mongo { +namespace { + +class PipelineMetadataTreeTest : public AggregationContextFixture { +protected: + auto jsonToPipeline(StringData jsonArray) { + const auto inputBson = fromjson("{pipeline: " + jsonArray + "}"); + + ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array); + auto rawPipeline = + uassertStatusOK(AggregationRequest::parsePipelineFromBSON(inputBson["pipeline"])); + NamespaceString testNss("test", "collection"); + AggregationRequest request(testNss, rawPipeline); + + return uassertStatusOK(Pipeline::parse(request.getPipeline(), getExpCtx())); + } + + template + std::vector make_vector(Args&&... args) { + std::vector v; + v.reserve(sizeof...(Args)); + (v.push_back(std::forward(args)), ...); + return v; + } + + void introduceCollection(StringData collectionName) { + NamespaceString fromNs("test", collectionName); + getExpCtx()->setResolvedNamespace_forTest(fromNs, {fromNs, std::vector{}}); + } +}; + +using namespace pipeline_metadata_tree; + +TEST_F(PipelineMetadataTreeTest, LinearPipelinesConstructProperTrees) { + struct TestThing { + auto operator==(const TestThing& other) const { + return number == other.number; + } + int number; + } initial{23}; + auto ignoreDocumentSourceAddOne = + [](const auto& previousThing, const auto&, const auto&) -> TestThing { + return {previousThing.number + 1}; + }; + + auto makeUniqueStage = [&](auto&& contents, + std::unique_ptr> principalChild, + std::vector>&& additionalChildren) { + return std::make_unique>( + std::move(contents), std::move(principalChild), std::move(additionalChildren)); + }; + + ASSERT([&]() { + auto pipePtr = jsonToPipeline("[{$project: {name : 1}}]"); + return makeTree({initial}, *pipePtr, ignoreDocumentSourceAddOne); + }() == Stage(TestThing{23}, {}, {})); + + ASSERT([&]() { + auto pipePtr = jsonToPipeline( + "[{$project: {name: 1, status: 1}}, " + "{$match: {status: \"completed\"}}]"); + return makeTree({initial}, *pipePtr, ignoreDocumentSourceAddOne); + }() == Stage(TestThing{24}, makeUniqueStage(TestThing{23}, {}, {}), {})); + + ASSERT([&]() { + auto pipePtr = jsonToPipeline( + "[{$project: {name: 1, status: 1}}, " + "{$match: {status: \"completed\"}}, " + "{$match: {status: \"completed\"}}, " + "{$match: {status: \"completed\"}}, " + "{$match: {status: \"completed\"}}, " + "{$match: {status: \"completed\"}}]"); + return makeTree({initial}, *pipePtr, ignoreDocumentSourceAddOne); + }() == Stage(TestThing{28}, + makeUniqueStage( + TestThing{27}, + makeUniqueStage( + TestThing{26}, + makeUniqueStage(TestThing{25}, + makeUniqueStage(TestThing{24}, + makeUniqueStage(TestThing{23}, {}, {}), + {}), + {}), + {}), + {}), + {})); +} + + +TEST_F(PipelineMetadataTreeTest, BranchingPipelinesConstructProperTrees) { + struct TestThing { + auto operator==(const TestThing& other) const { + return string == other.string; + } + std::string string; + }; + + auto makeUniqueStage = [&](auto&& contents, + std::unique_ptr> principalChild, + std::vector>&& additionalChildren) { + return std::make_unique>( + std::move(contents), std::move(principalChild), std::move(additionalChildren)); + }; + + // Builds a string representation of stages leading up to the current stage. This is done by + // concatenating a character representing the current stage to the string from the previous + // stage. In addition, lookup and facet append a string containing each of the off-the-end + // strings from their sub-pipelines. + auto buildRepresentativeString = [](const auto& previousThing, + const auto& extraThings, + const DocumentSource& source) -> TestThing { + if (typeid(source) == typeid(DocumentSourceMatch)) + return {previousThing.string + "m"}; + if (typeid(source) == typeid(DocumentSourceSingleDocumentTransformation)) + return {previousThing.string + "p"}; + if (typeid(source) == typeid(DocumentSourceGraphLookUp)) + return {previousThing.string + "x"}; + if (typeid(source) == typeid(DocumentSourceUnwind)) + return {previousThing.string + "u"}; + if (typeid(source) == typeid(DocumentSourceGroup)) + return {previousThing.string + "g"}; + if (auto lookupSource = dynamic_cast(&source)) { + if (lookupSource->wasConstructedWithPipelineSyntax()) + return {previousThing.string + "l[" + extraThings.front().string + "]"}; + else + return {previousThing.string + "l"}; + } + if (typeid(source) == typeid(DocumentSourceFacet)) + return {previousThing.string + "f[" + + std::accumulate(std::next(extraThings.begin()), + extraThings.end(), + extraThings.front().string, + [](auto l, auto r) { return l + ", " + r.string; }) + + "]"}; + if (typeid(source) == typeid(DocumentSourceTeeConsumer)) + return {previousThing.string + "t"}; + if (typeid(source) == typeid(DocumentSourceSort)) + return {previousThing.string + "s"}; + if (typeid(source) == typeid(DocumentSourceBucketAuto)) + return {previousThing.string + "b"}; + if (typeid(source) == typeid(DocumentSourceLimit)) + return {previousThing.string + "#"}; + return {previousThing.string + "?"}; + }; + + introduceCollection("folios"); + introduceCollection("trades"); + introduceCollection("instruments"); + + ASSERT([&]() { + auto pipePtr = jsonToPipeline( + "[{$match: {ident: {$in: [12345]}}}, " + "{$project: {_id: 0, ident: 1}}, " + "{$graphLookup: {from: \"folios\", startWith: 12345, connectFromField: \"ident\", " + "connectToField: \"mgr\", as: \"sub_positions\", maxDepth: 100}}, " + "{$unwind: \"$sub_positions\"}, " + "{$lookup: {from: \"trades\", as: \"trade\", let: {sp: \"sub_positions.ident\"}, " + "pipeline: [{$match: {$expr: {$eq: [\"$$sp\", \"$opcvm\"]}}}]}}, " + "{$unwind: \"$trade\"}, " + "{$lookup: {from: \"instruments\", as: \"instr\", localField: \"trade.sicovam\", " + "foreignField: \"sicovam\"}}, " + "{$unwind: \"$instr\"}, " + "{$group: {_id: {PositionID: \"$trade.mvtident\", \"InstrumentReference\": " + "\"$instr.libelle\"}, NumberOfSecurities: {$sum:\"$trade.quantite\"}}}]"); + return makeTree({{"1"}, {"2"}}, *pipePtr, buildRepresentativeString); + }() == Stage(TestThing{"1mpxul[2m]ulu"}, + makeUniqueStage( + TestThing{"1mpxul[2m]ul"}, + makeUniqueStage( + TestThing{"1mpxul[2m]u"}, + makeUniqueStage( + TestThing{"1mpxul[2m]"}, + makeUniqueStage( + TestThing{"1mpxu"}, + makeUniqueStage( + TestThing{"1mpx"}, + makeUniqueStage( + TestThing{"1mp"}, + makeUniqueStage(TestThing{"1m"}, + makeUniqueStage(TestThing{"1"}, {}, {}), + {}), + {}), + {}), + make_vector>(Stage(TestThing{"2"}, {}, {}))), + {}), + {}), + {}), + {})); + + ASSERT([&]() { + auto pipePtr = jsonToPipeline( + "[{$facet:{" + "categorizedByTags: " + "[{$unwind: \"$tags\"}, {$sortByCount: \"$tags\"}], " + "categorizedByYears: [{$match: { year: {$exists: 1}}}, " + "{$bucket: {groupBy: \"$year\", boundaries: [ 2000, 2010, 2015, 2020]}}], " + "\"categorizedByYears(Auto)\": [{$bucketAuto: {groupBy: \"$year\", buckets: 2}}]}}, " + "{$limit: 12}]"); + return makeTree({{""}}, *pipePtr, buildRepresentativeString); + }() == Stage(TestThing{"f[tugs, tmgs, tb]"}, + makeUniqueStage( + TestThing{""}, + {}, + make_vector>( + Stage(TestThing{"tug"}, + makeUniqueStage( + TestThing{"tu"}, + makeUniqueStage( + TestThing{"t"}, makeUniqueStage(TestThing{""}, {}, {}), {}), + {}), + {}), + Stage(TestThing{"tmg"}, + makeUniqueStage( + TestThing{"tm"}, + makeUniqueStage( + TestThing{"t"}, makeUniqueStage(TestThing{""}, {}, {}), {}), + {}), + {}), + Stage(TestThing{"t"}, makeUniqueStage(TestThing{""}, {}, {}), {}))), + {})); +} + +TEST_F(PipelineMetadataTreeTest, ZipWalksAPipelineAndTreeInTandemAndInOrder) { + struct TestThing { + auto operator==(const TestThing& other) const { + return typeInfo == other.typeInfo; + } + const std::type_info* typeInfo = nullptr; + }; + + auto takeTypeInfo = [](const auto&, const auto&, const DocumentSource& source) -> TestThing { + return {&typeid(source)}; + }; + + // The stack holds one element for each branch of the tree. + std::stack previousStack; + // Verifies that we walk each branch from leaf upwards towards the root when invoking the zip() + // function, since we will throw if the top of the stack (which is the branch being actively + // walked) has a typeid which does not match the typeid of the previous stage. + auto tookTypeInfoOrThrow = [&previousStack](auto* stage, auto* source) { + for ([[maybe_unused]] auto&& child : stage->additionalChildren) + previousStack.pop(); + if (!stage->principalChild) + previousStack.push(nullptr); + if (auto typeInfo = stage->contents.typeInfo; + (previousStack.top() && typeInfo && *previousStack.top() != *typeInfo) || + (previousStack.top() && !typeInfo) || (!previousStack.top() && typeInfo)) + uasserted(51163, "Walk did not proceed in expected order!"); + previousStack.top() = &typeid(*source); + }; + + introduceCollection("folios"); + introduceCollection("trades"); + introduceCollection("instruments"); + + ASSERT_DOES_NOT_THROW([&]() { + auto pipePtr = jsonToPipeline( + "[{$match: {ident: {$in: [12345]}}}, " + "{$project: {_id: 0, ident: 1}}, " + "{$graphLookup: {from: \"folios\", startWith: 12345, connectFromField: \"ident\", " + "connectToField: \"mgr\", as: \"sub_positions\", maxDepth: 100}}, " + "{$unwind: \"$sub_positions\"}, " + "{$lookup: {from: \"trades\", as: \"trade\", let: {sp: \"sub_positions.ident\"}, " + "pipeline: [{$match: {$expr: {$eq: [\"$$sp\", \"$opcvm\"]}}}]}}, " + "{$unwind: \"$trade\"}, " + "{$lookup: {from: \"instruments\", as: \"instr\", localField: \"trade.sicovam\", " + "foreignField: \"sicovam\"}}, " + "{$unwind: \"$instr\"}, " + "{$group: {_id: {PositionID: \"$trade.mvtident\", \"InstrumentReference\": " + "\"$instr.libelle\"}, NumberOfSecurities: {$sum:\"$trade.quantite\"}}}]"); + auto tree = makeTree({{}, {}}, *pipePtr, takeTypeInfo); + zip(&tree, &*pipePtr, tookTypeInfoOrThrow); + previousStack.pop(); + }()); + + ASSERT_DOES_NOT_THROW([&]() { + auto pipePtr = jsonToPipeline( + "[{$facet:{" + "categorizedByTags: " + "[{$unwind: \"$tags\"}, {$sortByCount: \"$tags\"}], " + "categorizedByYears: [{$match: { year: {$exists: 1}}}, " + "{$bucket: {groupBy: \"$year\", boundaries: [ 2000, 2010, 2015, 2020]}}], " + "\"categorizedByYears(Auto)\": [{$bucketAuto: {groupBy: \"$year\", buckets: 2}}]}}, " + "{$limit: 12}]"); + auto tree = makeTree({{}, {}}, *pipePtr, takeTypeInfo); + zip(&tree, &*pipePtr, tookTypeInfoOrThrow); + previousStack.pop(); + }()); +} + +} // namespace +} // namespace mongo -- cgit v1.2.1