summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Evans <jacob.evans@mongodb.com>2020-01-17 18:49:44 +0000
committerA. Jesse Jiryu Davis <jesse@mongodb.com>2020-01-27 15:40:40 -0500
commitbe7dd575893b57b7c6e5d2a4cb61d42b0b5a42f2 (patch)
treead8b3135c04bb1f9f22c31806270ab53f79830e0
parentd0d041cb8422bb29c109ddd102b038423a529c17 (diff)
downloadmongo-be7dd575893b57b7c6e5d2a4cb61d42b0b5a42f2.tar.gz
SERVER-45399 Add basic union aggregation stage for unsharded collections
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp187
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h134
-rw-r--r--src/mongo/db/pipeline/document_source_union_with_test.cpp300
4 files changed, 623 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index e9a4e66b9c2..ae5f56ae340 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -323,6 +323,7 @@ pipelineEnv.Library(
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
'document_source_tee_consumer.cpp',
+ 'document_source_union_with.cpp',
'document_source_unwind.cpp',
'pipeline.cpp',
'semantic_analysis.cpp',
@@ -447,6 +448,7 @@ env.CppUnitTest(
'document_source_skip_test.cpp',
'document_source_sort_by_count_test.cpp',
'document_source_sort_test.cpp',
+ 'document_source_union_with_test.cpp',
'document_source_unwind_test.cpp',
'expression_and_test.cpp',
'expression_compare_test.cpp',
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
new file mode 100644
index 00000000000..fa478bf653f
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -0,0 +1,187 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_union_with.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(unionWith,
+ DocumentSourceUnionWith::LiteParsed::parse,
+ DocumentSourceUnionWith::createFromBson);
+
+std::unique_ptr<DocumentSourceUnionWith::LiteParsed> DocumentSourceUnionWith::LiteParsed::parse(
+ const AggregationRequest& request, const BSONElement& spec) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream()
+ << "the $unionWith stage specification must be an object or string, but found "
+ << typeName(spec.type()),
+ spec.type() == BSONType::Object || spec.type() == BSONType::String);
+
+ NamespaceString unionNss;
+ stdx::unordered_set<NamespaceString> foreignNssSet;
+ boost::optional<LiteParsedPipeline> liteParsedPipeline;
+ if (spec.type() == BSONType::String) {
+ unionNss = NamespaceString(request.getNamespaceString().db(), spec.valueStringData());
+ } else {
+ unionNss =
+ NamespaceString(request.getNamespaceString().db(), spec["coll"].valueStringData());
+
+ // Recursively lite parse the nested pipeline, if one exists.
+ if (auto pipelineElem = spec["pipeline"]) {
+ auto pipeline =
+ uassertStatusOK(AggregationRequest::parsePipelineFromBSON(pipelineElem));
+ AggregationRequest foreignAggReq(unionNss, std::move(pipeline));
+ liteParsedPipeline = LiteParsedPipeline(foreignAggReq);
+
+ foreignNssSet.merge(liteParsedPipeline->getInvolvedNamespaces());
+ }
+ }
+
+ foreignNssSet.insert(unionNss);
+
+ return std::make_unique<DocumentSourceUnionWith::LiteParsed>(
+ std::move(unionNss), std::move(foreignNssSet), std::move(liteParsedPipeline));
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream()
+ << "the $unionWith stage specification must be an object or string, but found "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Object || elem.type() == BSONType::String);
+
+ NamespaceString unionNss;
+ std::vector<BSONObj> pipeline;
+ if (elem.type() == BSONType::String) {
+ unionNss = NamespaceString(expCtx->ns.db().toString(), elem.valueStringData());
+ } else {
+ bool sawColl = false;
+ bool sawPipeline = false;
+ for (auto&& arg : elem.embeddedObject()) {
+ auto fieldName = arg.fieldNameStringData();
+ if (fieldName == "coll") {
+ if (sawColl)
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Redundant 'coll' argument given to $unionWith");
+ unionNss = NamespaceString(expCtx->ns.db().toString(), arg.valueStringData());
+ sawColl = true;
+ } else if (fieldName == "pipeline") {
+ if (sawPipeline)
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Redundant 'pipeline' argument given to $unionWith");
+ pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(arg));
+ sawPipeline = true;
+ } else
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Unknown argument given to $unionWith stage: " << fieldName);
+ }
+ if (!sawColl)
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "No 'coll' argument given to $unionWith");
+ }
+ return make_intrusive<DocumentSourceUnionWith>(
+ expCtx,
+ uassertStatusOK(
+ Pipeline::parse(std::move(pipeline), expCtx->copyWith(std::move(unionNss)))));
+}
+
+DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() {
+ if (!_sourceExhausted) {
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isEOF()) {
+ return nextInput;
+ }
+ _sourceExhausted = true;
+ // All documents from the base collection have been returned, switch to iterating the sub-
+ // pipeline by falling through below.
+ }
+
+ if (!_cursorAttached) {
+ auto ctx = _pipeline->getContext();
+ _pipeline =
+ pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(ctx, _pipeline.release());
+ _cursorAttached = true;
+ }
+
+ if (auto res = _pipeline->getNext())
+ return std::move(*res);
+
+ return GetNextResult::makeEOF();
+}
+
+DocumentSource::GetModPathsReturn DocumentSourceUnionWith::getModifiedPaths() const {
+ // Since we might have a document arrive from the foreign pipeline with the same path as a
+ // document in the main pipeline. Without introspecting the sub-pipeline, we must report that
+ // all paths have been modified.
+ return {GetModPathsReturn::Type::kAllPaths, {}, {}};
+}
+
+void DocumentSourceUnionWith::doDispose() {
+ _pipeline->dispose(pExpCtx->opCtx);
+ _pipeline.reset();
+}
+
+void DocumentSourceUnionWith::serializeToArray(
+ std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+ BSONArrayBuilder bab;
+ for (auto&& stage : _pipeline->serialize())
+ bab << stage;
+ Document doc = DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll()
+ << "pipeline" << bab.arr()));
+ array.push_back(Value(doc));
+}
+
+DepsTracker::State DocumentSourceUnionWith::getDependencies(DepsTracker* deps) const {
+ return DepsTracker::State::SEE_NEXT;
+}
+
+void DocumentSourceUnionWith::detachFromOperationContext() {
+ // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
+ // use Pipeline::detachFromOperationContext() to take care of updating the Pipeline's
+ // ExpressionContext.
+ _pipeline->detachFromOperationContext();
+}
+
+void DocumentSourceUnionWith::reattachToOperationContext(OperationContext* opCtx) {
+ // We have a pipeline we're going to be executing across multiple calls to getNext(), so we
+ // use Pipeline::reattachToOperationContext() to take care of updating the Pipeline's
+ // ExpressionContext.
+ _pipeline->reattachToOperationContext(opCtx);
+}
+
+void DocumentSourceUnionWith::addInvolvedCollections(
+ stdx::unordered_set<NamespaceString>* collectionNames) const {
+ collectionNames->insert(_pipeline->getContext()->ns);
+ collectionNames->merge(_pipeline->getInvolvedCollections());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
new file mode 100644
index 00000000000..7181a7d2ab7
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -0,0 +1,134 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
+
+namespace mongo {
+
+class DocumentSourceUnionWith final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$unionWith"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ class LiteParsed final : public LiteParsedDocumentSource {
+ public:
+ static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
+ const BSONElement& spec);
+
+ LiteParsed(NamespaceString withNss,
+ stdx::unordered_set<NamespaceString> foreignNssSet,
+ boost::optional<LiteParsedPipeline> liteParsedPipeline)
+ : _withNss{std::move(withNss)},
+ _foreignNssSet(std::move(foreignNssSet)),
+ _liteParsedPipeline(std::move(liteParsedPipeline)) {}
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ return {_foreignNssSet};
+ }
+
+ PrivilegeVector requiredPrivileges(bool) const final {
+ return {};
+ }
+
+ bool allowShardedForeignCollection(NamespaceString) const final {
+ return true;
+ }
+
+ bool allowedToPassthroughFromMongos() const final {
+ return true;
+ }
+
+ private:
+ const NamespaceString _withNss;
+ const stdx::unordered_set<NamespaceString> _foreignNssSet;
+ const boost::optional<LiteParsedPipeline> _liteParsedPipeline;
+ };
+
+ DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
+ : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {}
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+ void serializeToArray(
+ std::vector<Value>& array,
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ GetModPathsReturn getModifiedPaths() const final;
+
+ StageConstraints constraints(Pipeline::SplitState) const final {
+ return StageConstraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed);
+ }
+
+ DepsTracker::State getDependencies(DepsTracker* deps) const final;
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return DistributedPlanLogic{nullptr, this, boost::none};
+ }
+
+ void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final;
+
+ void detachFromOperationContext() final;
+
+ void reattachToOperationContext(OperationContext* opCtx) final;
+
+protected:
+ GetNextResult doGetNext() final;
+ void doDispose() final;
+
+private:
+ /**
+ * Should not be called; use serializeToArray instead.
+ */
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
+ bool _sourceExhausted = false;
+ bool _cursorAttached = false;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp
new file mode 100644
index 00000000000..d45e9ce4f87
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp
@@ -0,0 +1,300 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <array>
+#include <deque>
+#include <list>
+#include <set>
+#include <vector>
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/document_comparator.h"
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_add_fields.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_union_with.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/intrusive_counter.h"
+
+namespace mongo {
+namespace {
+
+using MockMongoInterface = StubMongoProcessInterfaceLookupSingleDocument;
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceUnionWithTest = AggregationContextFixture;
+
+TEST_F(DocumentSourceUnionWithTest, BasicSerialUnions) {
+ const auto docs = std::array{Document{{"a", 1}}, Document{{"b", 1}}, Document{{"c", 1}}};
+ const auto mock = DocumentSourceMock::createForTest(docs[0]);
+ const auto mockDequeOne = std::deque<DocumentSource::GetNextResult>{Document{docs[1]}};
+ const auto mockDequeTwo = std::deque<DocumentSource::GetNextResult>{Document{docs[2]}};
+ const auto mockCtxOne = getExpCtx()->copyWith({});
+ mockCtxOne->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeOne);
+ const auto mockCtxTwo = getExpCtx()->copyWith({});
+ mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
+ auto unionWithOne = DocumentSourceUnionWith(
+ mockCtxOne,
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ auto unionWithTwo = DocumentSourceUnionWith(
+ mockCtxTwo,
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ unionWithOne.setSource(mock.get());
+ unionWithTwo.setSource(&unionWithOne);
+
+ auto results = DocumentComparator().makeUnorderedDocumentSet();
+ for (auto& doc [[maybe_unused]] : docs) {
+ auto next = unionWithTwo.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ const auto [ignored, inserted] = results.insert(next.releaseDocument());
+ ASSERT_TRUE(inserted);
+ }
+ for (const auto& doc : docs)
+ ASSERT_TRUE(results.find(doc) != results.end());
+
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+}
+
+TEST_F(DocumentSourceUnionWithTest, BasicNestedUnions) {
+ const auto docs = std::array{Document{{"a", 1}}, Document{{"b", 1}}, Document{{"c", 1}}};
+ const auto mock = DocumentSourceMock::createForTest(docs[0]);
+ const auto mockDequeOne = std::deque<DocumentSource::GetNextResult>{Document{docs[1]}};
+ const auto mockDequeTwo = std::deque<DocumentSource::GetNextResult>{Document{docs[2]}};
+ const auto mockCtxOne = getExpCtx()->copyWith({});
+ mockCtxOne->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeOne);
+ const auto mockCtxTwo = getExpCtx()->copyWith({});
+ mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
+ auto unionWithOne = make_intrusive<DocumentSourceUnionWith>(
+ mockCtxOne,
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ auto unionWithTwo = DocumentSourceUnionWith(
+ mockCtxTwo,
+ uassertStatusOK(Pipeline::create(
+ std::list<boost::intrusive_ptr<DocumentSource>>{unionWithOne}, getExpCtx())));
+ unionWithTwo.setSource(mock.get());
+
+ auto results = DocumentComparator().makeUnorderedDocumentSet();
+ for (auto& doc [[maybe_unused]] : docs) {
+ auto next = unionWithTwo.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ const auto [ignored, inserted] = results.insert(next.releaseDocument());
+ ASSERT_TRUE(inserted);
+ }
+ for (const auto& doc : docs)
+ ASSERT_TRUE(results.find(doc) != results.end());
+
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+}
+
+TEST_F(DocumentSourceUnionWithTest, UnionsWithNonEmptySubPipelines) {
+ const auto inputDocs = std::array{Document{{"a", 1}}, Document{{"b", 1}}, Document{{"c", 1}}};
+ const auto outputDocs = std::array{Document{{"a", 1}}, Document{{"c", 1}, {"d", 1}}};
+ const auto mock = DocumentSourceMock::createForTest(inputDocs[0]);
+ const auto mockDequeOne = std::deque<DocumentSource::GetNextResult>{Document{inputDocs[1]}};
+ const auto mockDequeTwo = std::deque<DocumentSource::GetNextResult>{Document{inputDocs[2]}};
+ const auto mockCtxOne = getExpCtx()->copyWith({});
+ mockCtxOne->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeOne);
+ const auto mockCtxTwo = getExpCtx()->copyWith({});
+ mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
+ const auto filter = DocumentSourceMatch::create(BSON("d" << 1), mockCtxOne);
+ const auto proj = DocumentSourceAddFields::create(BSON("d" << 1), mockCtxTwo);
+ auto unionWithOne = DocumentSourceUnionWith(
+ mockCtxOne,
+ uassertStatusOK(Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{filter},
+ getExpCtx())));
+ auto unionWithTwo = DocumentSourceUnionWith(
+ mockCtxTwo,
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{proj}, getExpCtx())));
+ unionWithOne.setSource(mock.get());
+ unionWithTwo.setSource(&unionWithOne);
+
+ auto results = DocumentComparator().makeUnorderedDocumentSet();
+ for (auto& doc [[maybe_unused]] : outputDocs) {
+ auto next = unionWithTwo.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ const auto [ignored, inserted] = results.insert(next.releaseDocument());
+ ASSERT_TRUE(inserted);
+ }
+ for (const auto& doc : outputDocs)
+ ASSERT_TRUE(results.find(doc) != results.end());
+
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+}
+
+TEST_F(DocumentSourceUnionWithTest, SerializeAndParseWithPipeline) {
+ auto bson =
+ BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "pipeline"
+ << BSON_ARRAY(
+ BSON("$addFields" << BSON("a" << BSON("$const" << 3))))));
+ auto unionWith = DocumentSourceUnionWith::createFromBson(bson.firstElement(), getExpCtx());
+ ASSERT(unionWith->getSourceName() == DocumentSourceUnionWith::kStageName);
+ std::vector<Value> serializedArray;
+ unionWith->serializeToArray(serializedArray);
+ auto serializedBson = serializedArray[0].getDocument().toBson();
+ ASSERT_BSONOBJ_EQ(serializedBson, bson);
+ unionWith = DocumentSourceUnionWith::createFromBson(serializedBson.firstElement(), getExpCtx());
+ ASSERT(unionWith != nullptr);
+ ASSERT(unionWith->getSourceName() == DocumentSourceUnionWith::kStageName);
+}
+
+TEST_F(DocumentSourceUnionWithTest, SerializeAndParseWithoutPipeline) {
+ auto bson = BSON("$unionWith"
+ << "foo");
+ auto desugaredBson = BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "pipeline" << BSONArray()));
+ auto unionWith = DocumentSourceUnionWith::createFromBson(bson.firstElement(), getExpCtx());
+ ASSERT(unionWith->getSourceName() == DocumentSourceUnionWith::kStageName);
+ std::vector<Value> serializedArray;
+ unionWith->serializeToArray(serializedArray);
+ auto serializedBson = serializedArray[0].getDocument().toBson();
+ ASSERT_BSONOBJ_EQ(serializedBson, desugaredBson);
+ unionWith = DocumentSourceUnionWith::createFromBson(serializedBson.firstElement(), getExpCtx());
+ ASSERT(unionWith != nullptr);
+ ASSERT(unionWith->getSourceName() == DocumentSourceUnionWith::kStageName);
+}
+
+TEST_F(DocumentSourceUnionWithTest, SerializeAndParseWithoutPipelineExtraSubobject) {
+ auto bson = BSON("$unionWith" << BSON("coll"
+ << "foo"));
+ auto desugaredBson = BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "pipeline" << BSONArray()));
+ auto unionWith = DocumentSourceUnionWith::createFromBson(bson.firstElement(), getExpCtx());
+ ASSERT(unionWith->getSourceName() == DocumentSourceUnionWith::kStageName);
+ std::vector<Value> serializedArray;
+ unionWith->serializeToArray(serializedArray);
+ auto serializedBson = serializedArray[0].getDocument().toBson();
+ ASSERT_BSONOBJ_EQ(serializedBson, desugaredBson);
+ unionWith = DocumentSourceUnionWith::createFromBson(serializedBson.firstElement(), getExpCtx());
+ ASSERT(unionWith != nullptr);
+ ASSERT(unionWith->getSourceName() == DocumentSourceUnionWith::kStageName);
+}
+
+TEST_F(DocumentSourceUnionWithTest, ParseErrors) {
+ ASSERT_THROWS_CODE(DocumentSourceUnionWith::createFromBson(
+ BSON("$unionWith" << false).firstElement(), getExpCtx()),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+ ASSERT_THROWS_CODE(DocumentSourceUnionWith::createFromBson(BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "coll"
+ << "bar"))
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+ ASSERT_THROWS_CODE(
+ DocumentSourceUnionWith::createFromBson(
+ BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$addFields" << BSON("a" << 3))) << "coll"
+ << "bar"))
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+ ASSERT_THROWS_CODE(
+ DocumentSourceUnionWith::createFromBson(
+ BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$addFields" << BSON("a" << 3))) << "myDog"
+ << "bar"))
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ ErrorCodes::FailedToParse);
+ ASSERT_THROWS_CODE(
+ DocumentSourceUnionWith::createFromBson(
+ BSON("$unionWith" << BSON("coll"
+ << "foo"
+ << "pipeline"
+ << BSON_ARRAY(BSON("$petMyDog" << BSON("myDog" << 3)))))
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 16436);
+}
+
+TEST_F(DocumentSourceUnionWithTest, PropagatePauses) {
+ const auto mock =
+ DocumentSourceMock::createForTest({Document(),
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document(),
+ DocumentSource::GetNextResult::makePauseExecution()});
+ const auto mockDequeOne = std::deque<DocumentSource::GetNextResult>{};
+ const auto mockDequeTwo = std::deque<DocumentSource::GetNextResult>{};
+ const auto mockCtxOne = getExpCtx()->copyWith({});
+ mockCtxOne->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeOne);
+ const auto mockCtxTwo = getExpCtx()->copyWith({});
+ mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
+ auto unionWithOne = DocumentSourceUnionWith(
+ mockCtxOne,
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ auto unionWithTwo = DocumentSourceUnionWith(
+ mockCtxTwo,
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ unionWithOne.setSource(mock.get());
+ unionWithTwo.setSource(&unionWithOne);
+
+ ASSERT_TRUE(unionWithTwo.getNext().isAdvanced());
+ ASSERT_TRUE(unionWithTwo.getNext().isPaused());
+ ASSERT_TRUE(unionWithTwo.getNext().isAdvanced());
+ ASSERT_TRUE(unionWithTwo.getNext().isPaused());
+
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+ ASSERT_TRUE(unionWithTwo.getNext().isEOF());
+}
+
+} // namespace
+} // namespace mongo