summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Evans <jacob.evans@10gen.com>2019-03-20 11:42:07 -0400
committerJacob Evans <jacob.evans@10gen.com>2019-04-05 17:31:12 -0400
commit20c33aa11ae2925d66b2df81e2ddb6813d81400a (patch)
tree932795741f4167e6e0fd58b9a747cbb4e1740532
parentbff55ab3a32b7ee2f920c5a3aebb0eb9d42546c5 (diff)
downloadmongo-20c33aa11ae2925d66b2df81e2ddb6813d81400a.tar.gz
SERVER-40312 Create a generic tree for pipeline metatdata
-rw-r--r--src/mongo/db/pipeline/SConscript9
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h4
-rw-r--r--src/mongo/db/pipeline/pipeline.h4
-rw-r--r--src/mongo/db/pipeline/pipeline_metadata_tree.h279
-rw-r--r--src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp363
6 files changed, 663 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index e9183efb0eb..19c0dce4418 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -615,3 +615,12 @@ env.CppUnitTest(
'document_sources_idl',
],
)
+
+env.CppUnitTest(
+ target='pipeline_metadata_tree_test',
+ source='pipeline_metadata_tree_test.cpp',
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/service_context_test_fixture',
+ 'pipeline',
+ ],
+)
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 8c82ecf7cb1..74281663fb6 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -134,6 +134,10 @@ public:
return _facets;
}
+ auto& getFacetPipelines() {
+ return _facets;
+ }
+
// The following are overridden just to forward calls to sub-pipelines.
void addInvolvedCollections(stdx::unordered_set<NamespaceString>* involvedNssSet) const final;
void detachFromOperationContext() final;
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index f53dbcee3ad..1718ad5dc63 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -177,6 +177,10 @@ public:
return *_resolvedIntrospectionPipeline;
}
+ auto& getResolvedIntrospectionPipeline() {
+ return *_resolvedIntrospectionPipeline;
+ }
+
const Variables& getVariables_forTest() {
return _variables;
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 849143dd94e..05e2b77977d 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -278,6 +278,10 @@ public:
return _sources;
}
+ SourceContainer& getSources() {
+ return _sources;
+ }
+
/**
* Removes and returns the first stage of the pipeline. Returns nullptr if the pipeline is
* empty.
diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree.h b/src/mongo/db/pipeline/pipeline_metadata_tree.h
new file mode 100644
index 00000000000..e444ecf2734
--- /dev/null
+++ b/src/mongo/db/pipeline/pipeline_metadata_tree.h
@@ -0,0 +1,279 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <boost/optional.hpp>
+#include <deque>
+#include <functional>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_facet.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
+#include "mongo/db/pipeline/pipeline.h"
+
+/**
+ * A simple representation of an Aggregation Pipeline and functions for building it.
+ * PipelineMetadataTree has no default contents and reflects only the shape of stages. Each stage
+ * can be occupied by a a specified type in order to have the tree carry any form of arbitrary
+ * metadata. A PipelineMetadataTree is intended to be zipped along with another representation of a
+ * pipeline in order to supplement the other representation's metadata.
+ */
+namespace mongo::pipeline_metadata_tree {
+
+/**
+ * An alternate representation of a stage in an Aggregation Pipeline which contains handles to all
+ * stages it depends on, forming a tree. Each Stage tracks a specific piece of metadata of type 'T'.
+ * Since Stage forms a tree rather than a DAG, there are no handles from $facet component pipelines
+ * to their owning $facet stage but there exist pointers from any $facet stage to its component
+ * pipelines.
+ */
+template <typename T>
+struct Stage {
+ /**
+ * Construct an individual Stage from its components.
+ */
+ Stage(T&& contents,
+ std::unique_ptr<Stage> principalChild,
+ std::vector<Stage>&& additionalChildren)
+ : contents(std::move(contents)),
+ principalChild(std::move(principalChild)),
+ additionalChildren(std::move(additionalChildren)) {}
+
+ /**
+ * Specification of the move constructor intentionally inhibits compiler generation of a copy
+ * constructor. This is intentional since accidental copies could be deterimental for
+ * performance. This constructor is correctly formed only if the contents type 'T' also has a
+ * defined or defaulted move constructor. The correct definition of this constructor is
+ * essential for invoking 'makeTree'.
+ */
+ Stage(Stage&&) = default;
+
+ /**
+ * The move assignment operator is subject to the same conditions as the move constructor.
+ */
+ Stage& operator=(Stage&&) = default;
+
+ /**
+ * A comparison operator is correctly defined if the type 'T' has a defined comparison operator.
+ * This is optional.
+ */
+ bool operator==(const Stage& other) const {
+ return contents == other.contents &&
+ (principalChild && other.principalChild ? *principalChild == *other.principalChild
+ : !principalChild && !other.principalChild) &&
+ additionalChildren == other.additionalChildren;
+ }
+
+ T contents;
+
+ /**
+ * The child occuring directly before this stage in the pipeline. This is empty for the first
+ * Stage in any pipeline or sub-pipeline.
+ */
+ std::unique_ptr<Stage> principalChild;
+
+ /**
+ * Additional children are the ends of sub-pipelines that feed into this stage. This vector is
+ * non-empty only for stages which operate on one or more sub-pipelines such as $facet.
+ */
+ std::vector<Stage> additionalChildren;
+};
+
+/**
+ * Following convention, the nested detail namespace should be treated as private and not accessed
+ * directly.
+ */
+namespace detail {
+template <typename T>
+std::pair<boost::optional<Stage<T>>, std::function<T(const T&)>> makeTreeWithOffTheEndStage(
+ std::deque<T>&& initialStageContents,
+ const Pipeline& pipeline,
+ const std::function<T(const T&, const std::vector<T>&, const DocumentSource&)>& propagator);
+
+/**
+ * Produces additional children to be included in a given Stage if it has sub-pipelines. Included
+ * are off-the-end contents that would be generated for those sub-pipelines if they had one
+ * additional Stage. A deque 'initialStageContents' is provided to dequeue from in order to populate
+ * sub-pipelines that, in a graph model, never source from the current pipeline but only feed into
+ * it. For example, the initial stage contents are used to seed the contents of $lookup
+ * sub-pipelines. The current Stage's contents are provided to copy and populate sub-pipelines that
+ * source from the current pipeline and feed back into it through a successive edge. For example,
+ * $facet sub-pipelines are populated using a copy of the current Stage's contents.
+ */
+template <typename T>
+inline auto makeAdditionalChildren(
+ std::deque<T>&& initialStageContents,
+ const DocumentSource& source,
+ const std::function<T(const T&, const std::vector<T>&, const DocumentSource&)>& propagator,
+ const T& currentContentsToCopyForFacet) {
+ std::vector<Stage<T>> children;
+ std::vector<T> offTheEndContents;
+ if (auto lookupSource = dynamic_cast<const DocumentSourceLookUp*>(&source);
+ lookupSource && lookupSource->wasConstructedWithPipelineSyntax()) {
+ auto[child, offTheEndReshaper] =
+ makeTreeWithOffTheEndStage(std::move(initialStageContents),
+ lookupSource->getResolvedIntrospectionPipeline(),
+ propagator);
+ offTheEndContents.push_back(offTheEndReshaper(child.get().contents));
+ children.push_back(std::move(*child));
+ }
+ if (auto facetSource = dynamic_cast<const DocumentSourceFacet*>(&source))
+ std::transform(facetSource->getFacetPipelines().begin(),
+ facetSource->getFacetPipelines().end(),
+ std::back_inserter(children),
+ [&](const auto& fPipe) {
+ initialStageContents.push_front(currentContentsToCopyForFacet);
+ auto[child, offTheEndReshaper] = makeTreeWithOffTheEndStage(
+ std::move(initialStageContents), *fPipe.pipeline, propagator);
+ offTheEndContents.push_back(offTheEndReshaper(child.get().contents));
+ return std::move(*child);
+ });
+ return std::pair(std::move(children), std::move(offTheEndContents));
+}
+
+/**
+ * Produces a stage and returns a function to determine the contents for the next Stage. Given are
+ * an optional reference to a previous stage which is disengaged at the start of a pipeline or sub-
+ * pipeline. Also given is 'reshapeContents', a function to produce the content of the current
+ * stage. The current DocumentSource to build a corresponding Stage for is given through 'source'.
+ * The front of the 'initialStageContents' deque is used to populate the new Stage if there is no
+ * previous Stage and is ignored otherwise.
+ */
+template <typename T>
+inline auto makeStage(
+ std::deque<T>&& initialStageContents,
+ boost::optional<Stage<T>>&& previous,
+ const std::function<T(const T&)>& reshapeContents,
+ const DocumentSource& source,
+ const std::function<T(const T&, const std::vector<T>&, const DocumentSource&)>& propagator) {
+ auto contents =
+ previous ? reshapeContents(previous.get().contents) : initialStageContents.front();
+ if (!previous)
+ initialStageContents.pop_front();
+
+ auto[additionalChildren, offTheEndContents] =
+ makeAdditionalChildren(std::move(initialStageContents), source, propagator, contents);
+
+ auto principalChild = previous ? std::make_unique<Stage<T>>(std::move(previous.get()))
+ : std::unique_ptr<Stage<T>>();
+ std::function<T(const T&)> reshaper([&, offTheEndContents{std::move(offTheEndContents)} ](
+ const T& reshapable) { return propagator(reshapable, offTheEndContents, source); });
+ return std::pair(
+ boost::optional<Stage<T>>(
+ Stage(std::move(contents), std::move(principalChild), std::move(additionalChildren))),
+ std::move(reshaper));
+}
+
+template <typename T>
+inline std::pair<boost::optional<Stage<T>>, std::function<T(const T&)>> makeTreeWithOffTheEndStage(
+ std::deque<T>&& initialStageContents,
+ const Pipeline& pipeline,
+ const std::function<T(const T&, const std::vector<T>&, const DocumentSource&)>& propagator) {
+ std::pair<boost::optional<Stage<T>>, std::function<T(const T&)>> stageAndReshapeContents;
+ for (const auto& source : pipeline.getSources())
+ stageAndReshapeContents = makeStage(std::move(initialStageContents),
+ std::move(stageAndReshapeContents.first),
+ stageAndReshapeContents.second,
+ *source,
+ propagator);
+ return std::move(stageAndReshapeContents);
+}
+
+template <typename T>
+inline void walk(Stage<T>* stage,
+ Pipeline::SourceContainer::iterator* sourceIter,
+ const std::function<void(Stage<T>*, DocumentSource*)>& zipper) {
+ if (stage->principalChild)
+ walk(stage->principalChild.get(), sourceIter, zipper);
+
+ if (auto lookupSource = dynamic_cast<DocumentSourceLookUp*>(&***sourceIter);
+ lookupSource && lookupSource->wasConstructedWithPipelineSyntax()) {
+ auto iter = lookupSource->getResolvedIntrospectionPipeline().getSources().begin();
+ walk(&stage->additionalChildren.front(), &iter, zipper);
+ }
+
+ if (auto facetSource = dynamic_cast<const DocumentSourceFacet*>(&***sourceIter)) {
+ auto facetIter = facetSource->getFacetPipelines().begin();
+ for (auto& child : stage->additionalChildren) {
+ auto iter = facetIter++->pipeline->getSources().begin();
+ walk(&child, &iter, zipper);
+ }
+ }
+
+ zipper(stage, &**(*sourceIter)++);
+}
+
+} // namespace detail
+
+/**
+ * Builds a Stage from a Pipeline. Initial contents for the first pipline stage must be provided. A
+ * function 'propagator' is neccesary to determine how to build the contents of all further stages.
+ * A stage will receive the built contents from its directly preceding stage. Initial contents must
+ * be placed in 'initialStageContents'. Any expressive lookup pipelines require an additional
+ * initial content element in this queue.
+ *
+ * The arguments to propagator will be actualized with the following:
+ * 'T&' - In general, the contents from the previous stage, initial stages of the main pipeline and
+ * $lookup pipelines recieve an element off the queue 'initialStageContents'. $facet receives a copy
+ * of its parent's contents.
+ * 'std::vector<T>&' - Completed contents from sub-pipelines. $facet's additional children and
+ * expressive $lookup's final contents will be manifested in here. Note that these will be
+ * "off-the-end", that is constructed from the final stage of a sub-pipeline and not actually
+ * contained in that pipeline. This vector is empty for most stages which have only one child.
+ * 'DocumentSource&' - the current stage of the 'pipeline' a Stage object is being built for.
+ */
+template <typename T>
+inline Stage<T> makeTree(
+ std::deque<T>&& initialStageContents,
+ const Pipeline& pipeline,
+ const std::function<T(const T&, const std::vector<T>&, const DocumentSource&)>& propagator) {
+ return *detail::makeTreeWithOffTheEndStage(
+ std::move(initialStageContents), pipeline, propagator)
+ .first;
+}
+
+/**
+ * Walk a PipelineMetadataTree along with a Pipeline. Passes each Stage and its corresponding
+ * DocumentSource to 'zipper' two-by-two.
+ */
+template <typename T>
+inline void zip(Stage<T>* tree,
+ Pipeline* pipeline,
+ const std::function<void(Stage<T>*, DocumentSource*)>& zipper) {
+ auto iter = pipeline->getSources().begin();
+ detail::walk(tree, &iter, zipper);
+}
+
+} // namespace mongo::AggregatePipelineSchemaTracker
diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
new file mode 100644
index 00000000000..46a816ad22e
--- /dev/null
+++ b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
@@ -0,0 +1,363 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <functional>
+#include <memory>
+#include <numeric>
+#include <stack>
+#include <string>
+#include <typeinfo>
+#include <vector>
+
+#include "mongo/base/string_data.h"
+#include "mongo/bson/json.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/document_source_bucket_auto.h"
+#include "mongo/db/pipeline/document_source_facet.h"
+#include "mongo/db/pipeline/document_source_graph_lookup.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_tee_consumer.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/pipeline_metadata_tree.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+
+#define ASSERT_DOES_NOT_THROW(EXPRESSION) \
+ try { \
+ EXPRESSION; \
+ } catch (const AssertionException& e) { \
+ ::mongoutils::str::stream err; \
+ err << "Threw an exception incorrectly: " << e.toString() \
+ << " Exception occured in: " << #EXPRESSION; \
+ ::mongo::unittest::TestAssertionFailure(__FILE__, __LINE__, err).stream(); \
+ }
+
+namespace mongo {
+namespace {
+
+class PipelineMetadataTreeTest : public AggregationContextFixture {
+protected:
+ auto jsonToPipeline(StringData jsonArray) {
+ const auto inputBson = fromjson("{pipeline: " + jsonArray + "}");
+
+ ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array);
+ auto rawPipeline =
+ uassertStatusOK(AggregationRequest::parsePipelineFromBSON(inputBson["pipeline"]));
+ NamespaceString testNss("test", "collection");
+ AggregationRequest request(testNss, rawPipeline);
+
+ return uassertStatusOK(Pipeline::parse(request.getPipeline(), getExpCtx()));
+ }
+
+ template <typename T, typename... Args>
+ std::vector<T> make_vector(Args&&... args) {
+ std::vector<T> v;
+ v.reserve(sizeof...(Args));
+ (v.push_back(std::forward<Args>(args)), ...);
+ return v;
+ }
+
+ void introduceCollection(StringData collectionName) {
+ NamespaceString fromNs("test", collectionName);
+ getExpCtx()->setResolvedNamespace_forTest(fromNs, {fromNs, std::vector<BSONObj>{}});
+ }
+};
+
+using namespace pipeline_metadata_tree;
+
+TEST_F(PipelineMetadataTreeTest, LinearPipelinesConstructProperTrees) {
+ struct TestThing {
+ auto operator==(const TestThing& other) const {
+ return number == other.number;
+ }
+ int number;
+ } initial{23};
+ auto ignoreDocumentSourceAddOne =
+ [](const auto& previousThing, const auto&, const auto&) -> TestThing {
+ return {previousThing.number + 1};
+ };
+
+ auto makeUniqueStage = [&](auto&& contents,
+ std::unique_ptr<Stage<TestThing>> principalChild,
+ std::vector<Stage<TestThing>>&& additionalChildren) {
+ return std::make_unique<Stage<TestThing>>(
+ std::move(contents), std::move(principalChild), std::move(additionalChildren));
+ };
+
+ ASSERT([&]() {
+ auto pipePtr = jsonToPipeline("[{$project: {name : 1}}]");
+ return makeTree<TestThing>({initial}, *pipePtr, ignoreDocumentSourceAddOne);
+ }() == Stage(TestThing{23}, {}, {}));
+
+ ASSERT([&]() {
+ auto pipePtr = jsonToPipeline(
+ "[{$project: {name: 1, status: 1}}, "
+ "{$match: {status: \"completed\"}}]");
+ return makeTree<TestThing>({initial}, *pipePtr, ignoreDocumentSourceAddOne);
+ }() == Stage(TestThing{24}, makeUniqueStage(TestThing{23}, {}, {}), {}));
+
+ ASSERT([&]() {
+ auto pipePtr = jsonToPipeline(
+ "[{$project: {name: 1, status: 1}}, "
+ "{$match: {status: \"completed\"}}, "
+ "{$match: {status: \"completed\"}}, "
+ "{$match: {status: \"completed\"}}, "
+ "{$match: {status: \"completed\"}}, "
+ "{$match: {status: \"completed\"}}]");
+ return makeTree<TestThing>({initial}, *pipePtr, ignoreDocumentSourceAddOne);
+ }() == Stage(TestThing{28},
+ makeUniqueStage(
+ TestThing{27},
+ makeUniqueStage(
+ TestThing{26},
+ makeUniqueStage(TestThing{25},
+ makeUniqueStage(TestThing{24},
+ makeUniqueStage(TestThing{23}, {}, {}),
+ {}),
+ {}),
+ {}),
+ {}),
+ {}));
+}
+
+
+TEST_F(PipelineMetadataTreeTest, BranchingPipelinesConstructProperTrees) {
+ struct TestThing {
+ auto operator==(const TestThing& other) const {
+ return string == other.string;
+ }
+ std::string string;
+ };
+
+ auto makeUniqueStage = [&](auto&& contents,
+ std::unique_ptr<Stage<TestThing>> principalChild,
+ std::vector<Stage<TestThing>>&& additionalChildren) {
+ return std::make_unique<Stage<TestThing>>(
+ std::move(contents), std::move(principalChild), std::move(additionalChildren));
+ };
+
+ // Builds a string representation of stages leading up to the current stage. This is done by
+ // concatenating a character representing the current stage to the string from the previous
+ // stage. In addition, lookup and facet append a string containing each of the off-the-end
+ // strings from their sub-pipelines.
+ auto buildRepresentativeString = [](const auto& previousThing,
+ const auto& extraThings,
+ const DocumentSource& source) -> TestThing {
+ if (typeid(source) == typeid(DocumentSourceMatch))
+ return {previousThing.string + "m"};
+ if (typeid(source) == typeid(DocumentSourceSingleDocumentTransformation))
+ return {previousThing.string + "p"};
+ if (typeid(source) == typeid(DocumentSourceGraphLookUp))
+ return {previousThing.string + "x"};
+ if (typeid(source) == typeid(DocumentSourceUnwind))
+ return {previousThing.string + "u"};
+ if (typeid(source) == typeid(DocumentSourceGroup))
+ return {previousThing.string + "g"};
+ if (auto lookupSource = dynamic_cast<const DocumentSourceLookUp*>(&source)) {
+ if (lookupSource->wasConstructedWithPipelineSyntax())
+ return {previousThing.string + "l[" + extraThings.front().string + "]"};
+ else
+ return {previousThing.string + "l"};
+ }
+ if (typeid(source) == typeid(DocumentSourceFacet))
+ return {previousThing.string + "f[" +
+ std::accumulate(std::next(extraThings.begin()),
+ extraThings.end(),
+ extraThings.front().string,
+ [](auto l, auto r) { return l + ", " + r.string; }) +
+ "]"};
+ if (typeid(source) == typeid(DocumentSourceTeeConsumer))
+ return {previousThing.string + "t"};
+ if (typeid(source) == typeid(DocumentSourceSort))
+ return {previousThing.string + "s"};
+ if (typeid(source) == typeid(DocumentSourceBucketAuto))
+ return {previousThing.string + "b"};
+ if (typeid(source) == typeid(DocumentSourceLimit))
+ return {previousThing.string + "#"};
+ return {previousThing.string + "?"};
+ };
+
+ introduceCollection("folios");
+ introduceCollection("trades");
+ introduceCollection("instruments");
+
+ ASSERT([&]() {
+ auto pipePtr = jsonToPipeline(
+ "[{$match: {ident: {$in: [12345]}}}, "
+ "{$project: {_id: 0, ident: 1}}, "
+ "{$graphLookup: {from: \"folios\", startWith: 12345, connectFromField: \"ident\", "
+ "connectToField: \"mgr\", as: \"sub_positions\", maxDepth: 100}}, "
+ "{$unwind: \"$sub_positions\"}, "
+ "{$lookup: {from: \"trades\", as: \"trade\", let: {sp: \"sub_positions.ident\"}, "
+ "pipeline: [{$match: {$expr: {$eq: [\"$$sp\", \"$opcvm\"]}}}]}}, "
+ "{$unwind: \"$trade\"}, "
+ "{$lookup: {from: \"instruments\", as: \"instr\", localField: \"trade.sicovam\", "
+ "foreignField: \"sicovam\"}}, "
+ "{$unwind: \"$instr\"}, "
+ "{$group: {_id: {PositionID: \"$trade.mvtident\", \"InstrumentReference\": "
+ "\"$instr.libelle\"}, NumberOfSecurities: {$sum:\"$trade.quantite\"}}}]");
+ return makeTree<TestThing>({{"1"}, {"2"}}, *pipePtr, buildRepresentativeString);
+ }() == Stage(TestThing{"1mpxul[2m]ulu"},
+ makeUniqueStage(
+ TestThing{"1mpxul[2m]ul"},
+ makeUniqueStage(
+ TestThing{"1mpxul[2m]u"},
+ makeUniqueStage(
+ TestThing{"1mpxul[2m]"},
+ makeUniqueStage(
+ TestThing{"1mpxu"},
+ makeUniqueStage(
+ TestThing{"1mpx"},
+ makeUniqueStage(
+ TestThing{"1mp"},
+ makeUniqueStage(TestThing{"1m"},
+ makeUniqueStage(TestThing{"1"}, {}, {}),
+ {}),
+ {}),
+ {}),
+ make_vector<Stage<TestThing>>(Stage(TestThing{"2"}, {}, {}))),
+ {}),
+ {}),
+ {}),
+ {}));
+
+ ASSERT([&]() {
+ auto pipePtr = jsonToPipeline(
+ "[{$facet:{"
+ "categorizedByTags: "
+ "[{$unwind: \"$tags\"}, {$sortByCount: \"$tags\"}], "
+ "categorizedByYears: [{$match: { year: {$exists: 1}}}, "
+ "{$bucket: {groupBy: \"$year\", boundaries: [ 2000, 2010, 2015, 2020]}}], "
+ "\"categorizedByYears(Auto)\": [{$bucketAuto: {groupBy: \"$year\", buckets: 2}}]}}, "
+ "{$limit: 12}]");
+ return makeTree<TestThing>({{""}}, *pipePtr, buildRepresentativeString);
+ }() == Stage(TestThing{"f[tugs, tmgs, tb]"},
+ makeUniqueStage(
+ TestThing{""},
+ {},
+ make_vector<Stage<TestThing>>(
+ Stage(TestThing{"tug"},
+ makeUniqueStage(
+ TestThing{"tu"},
+ makeUniqueStage(
+ TestThing{"t"}, makeUniqueStage(TestThing{""}, {}, {}), {}),
+ {}),
+ {}),
+ Stage(TestThing{"tmg"},
+ makeUniqueStage(
+ TestThing{"tm"},
+ makeUniqueStage(
+ TestThing{"t"}, makeUniqueStage(TestThing{""}, {}, {}), {}),
+ {}),
+ {}),
+ Stage(TestThing{"t"}, makeUniqueStage(TestThing{""}, {}, {}), {}))),
+ {}));
+}
+
+TEST_F(PipelineMetadataTreeTest, ZipWalksAPipelineAndTreeInTandemAndInOrder) {
+ struct TestThing {
+ auto operator==(const TestThing& other) const {
+ return typeInfo == other.typeInfo;
+ }
+ const std::type_info* typeInfo = nullptr;
+ };
+
+ auto takeTypeInfo = [](const auto&, const auto&, const DocumentSource& source) -> TestThing {
+ return {&typeid(source)};
+ };
+
+ // The stack holds one element for each branch of the tree.
+ std::stack<const std::type_info*> previousStack;
+ // Verifies that we walk each branch from leaf upwards towards the root when invoking the zip()
+ // function, since we will throw if the top of the stack (which is the branch being actively
+ // walked) has a typeid which does not match the typeid of the previous stage.
+ auto tookTypeInfoOrThrow = [&previousStack](auto* stage, auto* source) {
+ for ([[maybe_unused]] auto&& child : stage->additionalChildren)
+ previousStack.pop();
+ if (!stage->principalChild)
+ previousStack.push(nullptr);
+ if (auto typeInfo = stage->contents.typeInfo;
+ (previousStack.top() && typeInfo && *previousStack.top() != *typeInfo) ||
+ (previousStack.top() && !typeInfo) || (!previousStack.top() && typeInfo))
+ uasserted(51163, "Walk did not proceed in expected order!");
+ previousStack.top() = &typeid(*source);
+ };
+
+ introduceCollection("folios");
+ introduceCollection("trades");
+ introduceCollection("instruments");
+
+ ASSERT_DOES_NOT_THROW([&]() {
+ auto pipePtr = jsonToPipeline(
+ "[{$match: {ident: {$in: [12345]}}}, "
+ "{$project: {_id: 0, ident: 1}}, "
+ "{$graphLookup: {from: \"folios\", startWith: 12345, connectFromField: \"ident\", "
+ "connectToField: \"mgr\", as: \"sub_positions\", maxDepth: 100}}, "
+ "{$unwind: \"$sub_positions\"}, "
+ "{$lookup: {from: \"trades\", as: \"trade\", let: {sp: \"sub_positions.ident\"}, "
+ "pipeline: [{$match: {$expr: {$eq: [\"$$sp\", \"$opcvm\"]}}}]}}, "
+ "{$unwind: \"$trade\"}, "
+ "{$lookup: {from: \"instruments\", as: \"instr\", localField: \"trade.sicovam\", "
+ "foreignField: \"sicovam\"}}, "
+ "{$unwind: \"$instr\"}, "
+ "{$group: {_id: {PositionID: \"$trade.mvtident\", \"InstrumentReference\": "
+ "\"$instr.libelle\"}, NumberOfSecurities: {$sum:\"$trade.quantite\"}}}]");
+ auto tree = makeTree<TestThing>({{}, {}}, *pipePtr, takeTypeInfo);
+ zip<TestThing>(&tree, &*pipePtr, tookTypeInfoOrThrow);
+ previousStack.pop();
+ }());
+
+ ASSERT_DOES_NOT_THROW([&]() {
+ auto pipePtr = jsonToPipeline(
+ "[{$facet:{"
+ "categorizedByTags: "
+ "[{$unwind: \"$tags\"}, {$sortByCount: \"$tags\"}], "
+ "categorizedByYears: [{$match: { year: {$exists: 1}}}, "
+ "{$bucket: {groupBy: \"$year\", boundaries: [ 2000, 2010, 2015, 2020]}}], "
+ "\"categorizedByYears(Auto)\": [{$bucketAuto: {groupBy: \"$year\", buckets: 2}}]}}, "
+ "{$limit: 12}]");
+ auto tree = makeTree<TestThing>({{}, {}}, *pipePtr, takeTypeInfo);
+ zip<TestThing>(&tree, &*pipePtr, tookTypeInfoOrThrow);
+ previousStack.pop();
+ }());
+}
+
+} // namespace
+} // namespace mongo