summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2021-08-16 18:50:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-16 19:16:49 +0000
commit0f9e41dd9730b9bb4db3fcee4fef634922337061 (patch)
tree413c6b0f88c9639add9de9c1db257468bda053d0
parent0195e030346fe2986320107cede904b54727930d (diff)
downloadmongo-0f9e41dd9730b9bb4db3fcee4fef634922337061.tar.gz
SERVER-58582 Create $documents stage and implement collectionless unionWith (cherry-picked from commit 0d2789815060cf2d7ac511cf6a1be966243b5e84) (cherry-picked from commit dcb0c2f2001fc17be51c38db1319197973758a8c)
SERVER-59188 Fix coverity issue in DocumentSourceUnwind (cherry-picked from commit bb6e1f9f7ae13210b99a84cfef08eeb0d2d0a86a) (cherry-picked from commit e1112c5fa3509664d6997dea3f314e067ba1e82a)
-rw-r--r--jstests/aggregation/documents.js102
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_documents.cpp79
-rw-r--r--src/mongo/db/pipeline/document_source_documents.h46
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h16
-rw-r--r--src/mongo/db/pipeline/document_source_queue_test.cpp90
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp47
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.idl1
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h6
11 files changed, 420 insertions, 28 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/documents.js
new file mode 100644
index 00000000000..f9289a3baf0
--- /dev/null
+++ b/jstests/aggregation/documents.js
@@ -0,0 +1,102 @@
+// This is the test for $documents stage in aggregation pipeline.
+// The $documents stage follows these rules:
+// * $documents must be in the beginning of the pipeline,
+// * $documents content must evaluate to an array of objects.
+// $documents is not meant to be used in a sharded environment. It would return
+// the same result set for each shard which is counter intuitive. The test is disabled
+// for mongos.
+// @tags: [
+// do_not_wrap_aggregations_in_facets,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged,
+// assumes_read_concern_unchanged,
+// assumes_against_mongod_not_mongos
+// ]
+
+(function() {
+"use strict";
+load('jstests/libs/fixture_helpers.js'); // for runCommandOnEachPrimary.
+
+const writeConcernOptions = {
+ writeConcern: {w: "majority"}
+};
+
+const currDB = db;
+FixtureHelpers.runCommandOnEachPrimary(
+ {db: db.getSiblingDB("admin"), cmdObj: {setParameter: 1, enableSearchMeta: true}});
+const coll = currDB.documents;
+coll.drop(writeConcernOptions);
+coll.insert({a: 1}, writeConcernOptions);
+
+// $documents given an array of objects.
+const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray();
+
+assert.eq(2, docs.length);
+assert.eq(docs[0], {a1: 1});
+assert.eq(docs[1], {a1: 2});
+
+// $documents evaluates to an array of objects.
+const docs1 =
+ coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}],
+ writeConcernOptions)
+ .toArray();
+
+assert.eq(100, docs1.length);
+for (let i = 0; i < 100; i++) {
+ assert.eq(docs1[i], {x: i});
+}
+
+// $documents evaluates to an array of objects.
+const docsUnionWith =
+ coll.aggregate(
+ [
+ {$documents: [{a: 13}]},
+ {
+ $unionWith: {
+ pipeline:
+ [{$documents: {$map: {input: {$range: [0, 10]}, in : {x: "$$this"}}}}]
+ }
+ }
+ ],
+ writeConcernOptions)
+ .toArray();
+
+assert.eq(11, docsUnionWith.length);
+assert.eq(docsUnionWith[0], {a: 13});
+for (let i = 1; i < 11; i++) {
+ assert.eq(docsUnionWith[i], {x: i - 1});
+}
+
+// $documents with const objects inside $unionWith (no "coll").
+const res = coll.aggregate([{$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}}],
+ writeConcernOptions)
+ .toArray();
+assert.eq(3, res.length);
+assert.eq(res[0]["a"], 1);
+assert.eq(res[1], {xx: 1});
+assert.eq(res[2], {xx: 2});
+
+function assertFails(pipeline, code) {
+ assert.commandFailedWithCode(currDB.runCommand({
+ aggregate: coll.getName(),
+ pipeline: pipeline,
+ writeConcern: writeConcernOptions.writeConcern,
+ cursor: {}
+ }),
+ code);
+}
+
+// Must fail due to misplaced $document.
+assertFails([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], 40602);
+// $unionWith must fail due to no $document
+assertFails([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], 9);
+
+// Must fail due to $documents producing array of non-objects.
+assertFails([{$documents: [1, 2, 3]}], 40228);
+
+// Must fail due $documents producing non-array.
+assertFails([{$documents: {a: 1}}], 5858203);
+
+// Must fail due $documents producing array of non-objects.
+assertFails([{$documents: {a: [1, 2, 3]}}], 5858203);
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 45430be6ed4..903f16f6854 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -216,6 +216,7 @@ pipelineEnv.Library(
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_current_op.cpp',
+ 'document_source_documents.cpp',
'document_source_exchange.cpp',
'document_source_facet.cpp',
'document_source_geo_near.cpp',
@@ -368,6 +369,7 @@ env.CppUnitTest(
'document_source_out_test.cpp',
'document_source_plan_cache_stats_test.cpp',
'document_source_project_test.cpp',
+ 'document_source_queue_test.cpp',
'document_source_redact_test.cpp',
'document_source_replace_root_test.cpp',
'document_source_sample_test.cpp',
diff --git a/src/mongo/db/pipeline/document_source_documents.cpp b/src/mongo/db/pipeline/document_source_documents.cpp
new file mode 100644
index 00000000000..e3fb1c88e5d
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_documents.cpp
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/document_source_documents.h"
+#include "mongo/db/exec/projection_executor.h"
+#include "mongo/db/exec/projection_executor_builder.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_queue.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/query/query_knobs_gen.h"
+
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+
+REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION(
+ documents,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceDocuments::createFromBson,
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44);
+
+std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(5858204,
+ "Must set 'enableSearchMeta' to true to use '$documents'",
+ enableSearchMeta.load());
+ // genField is a temporary field to hold docs to wire $project,
+ // $unwind, and $replaceRoot together.
+ auto genField = UUID::gen().toString();
+ auto projectContent = BSON(genField << elem);
+ auto queue = DocumentSourceQueue::create(expCtx);
+ queue->emplace_back(Document{});
+ /* Create the following pipeline from $documents: [...]
+ * => [ queue([{}]),
+ * project: {tempDocumentsField: [...]},
+ * unwind: "$tempDocumentsField",
+ * replaceWith: "$tempDocumentsField" ]
+ */
+ return {
+ queue,
+ DocumentSourceProject::create(projectContent, expCtx, elem.fieldNameStringData()),
+ DocumentSourceUnwind::create(expCtx, genField, false, {}, true),
+ DocumentSourceReplaceRoot::createFromBson(
+ BSON("$replaceRoot" << BSON("newRoot" << std::string("$") + genField)).firstElement(),
+ expCtx)};
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_documents.h b/src/mongo/db/pipeline/document_source_documents.h
new file mode 100644
index 00000000000..a6ab70e0c59
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_documents.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/query/projection_parser.h"
+
+namespace mongo {
+
+namespace DocumentSourceDocuments {
+
+static constexpr StringData kStageName = "$documents"_sd;
+
+static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+}; // namespace DocumentSourceDocuments
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
index 12b8157117c..d762825c07f 100644
--- a/src/mongo/db/pipeline/document_source_queue.cpp
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -33,6 +33,30 @@
namespace mongo {
+REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION(
+ queue,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceQueue::createFromBson,
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44);
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceQueue::createFromBson(
+ BSONElement arrayElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(
+ 5858205, "Must set 'enableSearchMeta' to true to use '$queue'", enableSearchMeta.load());
+ uassert(5858201,
+ "literal documents specification must be an array",
+ arrayElem.type() == BSONType::Array);
+ auto queue = DocumentSourceQueue::create(expCtx);
+ // arrayElem is an Array and can be iterated through by using .Obj() method
+ for (auto elem : arrayElem.Obj()) {
+ uassert(5858202,
+ "literal documents specification must be an array of objects",
+ elem.type() == BSONType::Object);
+ queue->emplace_back(Document{elem.Obj()});
+ }
+ return queue;
+}
+
boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new DocumentSourceQueue({}, expCtx);
@@ -55,4 +79,13 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
_queue.pop_front();
return next;
}
+
+Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ ValueArrayStream vals;
+ for (auto elem : _queue) {
+ vals << elem.getDocument();
+ }
+ return Value(DOC(kStageName << vals.done()));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
index 4c91abdf442..f24b0c875e4 100644
--- a/src/mongo/db/pipeline/document_source_queue.h
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -43,23 +43,18 @@ namespace mongo {
*/
class DocumentSourceQueue : public DocumentSource {
public:
- static constexpr StringData kStageName = "queue"_sd;
+ static constexpr StringData kStageName = "$queue"_sd;
static boost::intrusive_ptr<DocumentSourceQueue> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
DocumentSourceQueue(std::deque<GetNextResult> results,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
- virtual ~DocumentSourceQueue() {}
+ ~DocumentSourceQueue() override = default;
const char* getSourceName() const override;
- Value serialize(
- boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
- // This stage is not intended to be serialized. Supporting a fully-general serialization is
- // not trivial since we'd have to invent a serialization format for each of the
- // GetNextResult states.
- MONGO_UNREACHABLE;
- }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override;
StageConstraints constraints(Pipeline::SplitState pipeState) const override {
StageConstraints constraints(StreamType::kStreaming,
@@ -99,6 +94,9 @@ public:
_queue.push_back(result);
}
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
protected:
GetNextResult doGetNext() override;
// Return documents from front of queue.
diff --git a/src/mongo/db/pipeline/document_source_queue_test.cpp b/src/mongo/db/pipeline/document_source_queue_test.cpp
new file mode 100644
index 00000000000..6ac8169f664
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_queue_test.cpp
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_queue.h"
+
+namespace mongo {
+namespace {
+
+using QueueStageTest = AggregationContextFixture;
+
+
+TEST_F(QueueStageTest, QueueStageDeserialization) {
+ std::ignore = ServerParameterSet::getGlobal()
+ ->getMap()
+ .find("enableSearchMeta")
+ ->second->setFromString("true");
+ auto queueDoc = BSON("$queue" << BSON_ARRAY(BSONObj()));
+ auto queueStage = DocumentSourceQueue::createFromBson(queueDoc.firstElement(), getExpCtx());
+ ASSERT_TRUE(queueStage);
+
+ auto expectedResult = Document{{"a"_sd, 1}};
+ auto queueDoc1 = BSON("$queue" << BSON_ARRAY(BSON("a" << 1)));
+ auto queueStage1 = DocumentSourceQueue::createFromBson(queueDoc1.firstElement(), getExpCtx());
+ ASSERT_TRUE(queueStage1);
+ auto next = queueStage1->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ auto result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+ ASSERT(queueStage1->getNext().isEOF());
+}
+
+TEST_F(QueueStageTest, QueueStageDeserializationFails) {
+ std::ignore = ServerParameterSet::getGlobal()
+ ->getMap()
+ .find("enableSearchMeta")
+ ->second->setFromString("true");
+ auto queueDoc = BSON("$queue" << BSONObj());
+ ASSERT_THROWS_CODE(DocumentSourceQueue::createFromBson(queueDoc.firstElement(), getExpCtx()),
+ AssertionException,
+ 5858201);
+
+ auto queueDoc2 = BSON("$queue" << BSON_ARRAY(1 << 2 << 3));
+ ASSERT_THROWS_CODE(DocumentSourceQueue::createFromBson(queueDoc2.firstElement(), getExpCtx()),
+ AssertionException,
+ 5858202);
+}
+
+
+TEST_F(QueueStageTest, QueueStageSerialize) {
+ auto queueStage = DocumentSourceQueue::create(getExpCtx());
+ queueStage->emplace_back(DOC("a1" << 1));
+ queueStage->emplace_back(DOC("a2" << 2));
+
+ ASSERT_TRUE(queueStage);
+
+ auto res = queueStage->serialize(boost::none);
+
+ ASSERT_VALUE_EQ(res, Value{DOC("$queue" << DOC_ARRAY(DOC("a1" << 1) << DOC("a2" << 2)))});
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 7f21dcf1922..e52540f5e58 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -33,6 +33,7 @@
#include <iterator>
#include "mongo/db/commands/test_commands_enabled.h"
+#include "mongo/db/pipeline/document_source_documents.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_union_with.h"
@@ -90,6 +91,16 @@ DocumentSourceUnionWith::~DocumentSourceUnionWith() {
_pipeline.reset();
}
}
+
+void validateUnionWithCollectionlessPipeline(
+ const boost::optional<std::vector<mongo::BSONObj>>& pipeline) {
+ uassert(ErrorCodes::FailedToParse,
+ "$unionWith stage without explicit collection must have a pipeline with $documents as "
+ "first stage",
+ pipeline && pipeline->size() > 0 &&
+ !(*pipeline)[0].getField(DocumentSourceDocuments::kStageName).eoo());
+}
+
std::unique_ptr<DocumentSourceUnionWith::LiteParsed> DocumentSourceUnionWith::LiteParsed::parse(
const NamespaceString& nss, const BSONElement& spec) {
uassert(ErrorCodes::FailedToParse,
@@ -105,7 +116,13 @@ std::unique_ptr<DocumentSourceUnionWith::LiteParsed> DocumentSourceUnionWith::Li
} else {
auto unionWithSpec =
UnionWithSpec::parse(IDLParserErrorContext(kStageName), spec.embeddedObject());
- unionNss = NamespaceString(nss.db(), unionWithSpec.getColl());
+ if (unionWithSpec.getColl()) {
+ unionNss = NamespaceString(nss.db(), *unionWithSpec.getColl());
+ } else {
+ // If no collection specified, it must have $documents as first field in pipeline.
+ validateUnionWithCollectionlessPipeline(unionWithSpec.getPipeline());
+ unionNss = NamespaceString::makeCollectionlessAggregateNSS(nss.db());
+ }
// Recursively lite parse the nested pipeline, if one exists.
if (unionWithSpec.getPipeline()) {
@@ -157,7 +174,13 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::createFromBson(
} else {
auto unionWithSpec =
UnionWithSpec::parse(IDLParserErrorContext(kStageName), elem.embeddedObject());
- unionNss = NamespaceString(expCtx->ns.db().toString(), unionWithSpec.getColl());
+ if (unionWithSpec.getColl()) {
+ unionNss = NamespaceString(expCtx->ns.db().toString(), *unionWithSpec.getColl());
+ } else {
+ // if no collection specified, it must have $documents as first field in pipeline
+ validateUnionWithCollectionlessPipeline(unionWithSpec.getPipeline());
+ unionNss = NamespaceString::makeCollectionlessAggregateNSS(expCtx->ns.db());
+ }
pipeline = unionWithSpec.getPipeline().value_or(std::vector<BSONObj>{});
}
return make_intrusive<DocumentSourceUnionWith>(
@@ -252,6 +275,7 @@ void DocumentSourceUnionWith::doDispose() {
}
Value DocumentSourceUnionWith::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ auto collectionless = _pipeline->getContext()->ns.isCollectionlessAggregateNS();
if (explain) {
// There are several different possible states depending on the explain verbosity as well as
// the other stages in the pipeline:
@@ -276,8 +300,10 @@ Value DocumentSourceUnionWith::serialize(boost::optional<ExplainOptions::Verbosi
BSONArrayBuilder bab;
for (auto&& stage : _pipeline->serialize())
bab << stage;
- return Value(DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll()
- << "pipeline" << bab.arr())));
+ auto spec = collectionless
+ ? DOC("pipeline" << bab.arr())
+ : DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline" << bab.arr());
+ return Value(DOC(getSourceName() << spec));
}
invariant(pipeCopy);
@@ -287,15 +313,18 @@ Value DocumentSourceUnionWith::serialize(boost::optional<ExplainOptions::Verbosi
// We expect this to be an explanation of a pipeline -- there should only be one field.
invariant(explainLocal.nFields() == 1);
- return Value(
- DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline"
- << explainLocal.firstElement())));
+ auto spec = collectionless ? DOC("pipeline" << explainLocal.firstElement())
+ : DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline"
+ << explainLocal.firstElement());
+ return Value(DOC(getSourceName() << spec));
} else {
BSONArrayBuilder bab;
for (auto&& stage : _pipeline->serialize())
bab << stage;
- return Value(DOC(getSourceName() << DOC("coll" << _pipeline->getContext()->ns.coll()
- << "pipeline" << bab.arr())));
+ auto spec = collectionless
+ ? DOC("pipeline" << bab.arr())
+ : DOC("coll" << _pipeline->getContext()->ns.coll() << "pipeline" << bab.arr());
+ return Value(DOC(getSourceName() << spec));
}
}
diff --git a/src/mongo/db/pipeline/document_source_union_with.idl b/src/mongo/db/pipeline/document_source_union_with.idl
index 1987a9b37f5..77b3ea64a82 100644
--- a/src/mongo/db/pipeline/document_source_union_with.idl
+++ b/src/mongo/db/pipeline/document_source_union_with.idl
@@ -39,6 +39,7 @@ structs:
fields:
coll:
description: The collection to union with.
+ optional: true
type: string
pipeline:
description: An optional pipeline to apply to the collection being unioned.
diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp
index 95816bbb68e..7da658bbdd5 100644
--- a/src/mongo/db/pipeline/document_source_unwind.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind.cpp
@@ -48,7 +48,8 @@ class DocumentSourceUnwind::Unwinder {
public:
Unwinder(const FieldPath& unwindPath,
bool preserveNullAndEmptyArrays,
- const boost::optional<FieldPath>& indexPath);
+ const boost::optional<FieldPath>& indexPath,
+ bool strict);
/** Reset the unwinder to unwind a new document. */
void resetDocument(const Document& document);
@@ -75,6 +76,8 @@ private:
// If set, the $unwind stage will include the array index in the specified path, overwriting any
// existing value, setting to null when the value was a non-array or empty array.
const boost::optional<FieldPath> _indexPath;
+ // Specifies if input to $unwind is required to be an array.
+ const bool _strict;
Value _inputArray;
@@ -84,15 +87,17 @@ private:
vector<Position> _unwindPathFieldIndexes;
// Index into the _inputArray to return next.
- size_t _index;
+ size_t _index = 0;
};
DocumentSourceUnwind::Unwinder::Unwinder(const FieldPath& unwindPath,
bool preserveNullAndEmptyArrays,
- const boost::optional<FieldPath>& indexPath)
+ const boost::optional<FieldPath>& indexPath,
+ bool strict)
: _unwindPath(unwindPath),
_preserveNullAndEmptyArrays(preserveNullAndEmptyArrays),
- _indexPath(indexPath) {}
+ _indexPath(indexPath),
+ _strict(strict) {}
void DocumentSourceUnwind::Unwinder::resetDocument(const Document& document) {
// Reset document specific attributes.
@@ -114,6 +119,8 @@ DocumentSource::GetNextResult DocumentSourceUnwind::Unwinder::getNext() {
// this index in the output document, or null if the value didn't come from an array.
boost::optional<long long> indexForOutput;
+ uassert(
+ 5858203, "an array is expected", (_strict && _inputArray.getType() == Array) || !_strict);
if (_inputArray.getType() == Array) {
const size_t length = _inputArray.getArrayLength();
invariant(_index == 0 || _index < length);
@@ -159,12 +166,13 @@ DocumentSource::GetNextResult DocumentSourceUnwind::Unwinder::getNext() {
DocumentSourceUnwind::DocumentSourceUnwind(const intrusive_ptr<ExpressionContext>& pExpCtx,
const FieldPath& fieldPath,
bool preserveNullAndEmptyArrays,
- const boost::optional<FieldPath>& indexPath)
+ const boost::optional<FieldPath>& indexPath,
+ bool strict)
: DocumentSource(kStageName, pExpCtx),
_unwindPath(fieldPath),
_preserveNullAndEmptyArrays(preserveNullAndEmptyArrays),
_indexPath(indexPath),
- _unwinder(new Unwinder(fieldPath, preserveNullAndEmptyArrays, indexPath)) {}
+ _unwinder(new Unwinder(fieldPath, preserveNullAndEmptyArrays, indexPath, strict)) {}
REGISTER_DOCUMENT_SOURCE(unwind,
LiteParsedDocumentSourceDefault::parse,
@@ -178,12 +186,14 @@ intrusive_ptr<DocumentSourceUnwind> DocumentSourceUnwind::create(
const intrusive_ptr<ExpressionContext>& expCtx,
const string& unwindPath,
bool preserveNullAndEmptyArrays,
- const boost::optional<string>& indexPath) {
+ const boost::optional<string>& indexPath,
+ bool strict) {
intrusive_ptr<DocumentSourceUnwind> source(
new DocumentSourceUnwind(expCtx,
FieldPath(unwindPath),
preserveNullAndEmptyArrays,
- indexPath ? FieldPath(*indexPath) : boost::optional<FieldPath>()));
+ indexPath ? FieldPath(*indexPath) : boost::optional<FieldPath>(),
+ strict));
return source;
}
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index de7269d7639..757b906be54 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -78,7 +78,8 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const std::string& path,
bool includeNullIfEmptyOrMissing,
- const boost::optional<std::string>& includeArrayIndex);
+ const boost::optional<std::string>& includeArrayIndex,
+ bool strict = false);
std::string getUnwindPath() const {
return _unwindPath.fullPath();
@@ -96,7 +97,8 @@ private:
DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
const FieldPath& fieldPath,
bool includeNullIfEmptyOrMissing,
- const boost::optional<FieldPath>& includeArrayIndex);
+ const boost::optional<FieldPath>& includeArrayIndex,
+ bool strict);
GetNextResult doGetNext() final;