summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/documents.js104
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp62
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp1
3 files changed, 142 insertions, 25 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/documents.js
index 136adb64e7c..302b850ec02 100644
--- a/jstests/aggregation/documents.js
+++ b/jstests/aggregation/documents.js
@@ -16,7 +16,11 @@
(function() {
"use strict";
+load("jstests/aggregation/extras/utils.js"); // For resultsEq.
+
const dbName = jsTestName();
+// TODO SERVER-59097 - expose $documents and get rid of internal
+// client here
const writeConcernOptions = {
writeConcern: {w: "majority"}
};
@@ -36,6 +40,11 @@ const coll = currDB.documents;
coll.drop(writeConcernOptions);
coll.insert({a: 1}, writeConcernOptions);
+const lookup_coll = currDB.lookup_coll;
+lookup_coll.drop(writeConcernOptions);
+for (let i = 0; i < 10; i++) {
+ lookup_coll.insert({id_name: i, name: "name_" + i}, writeConcernOptions);
+}
// $documents given an array of objects.
const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray();
@@ -62,39 +71,106 @@ const docsUnionWith =
{
$unionWith: {
pipeline:
- [{$documents: {$map: {input: {$range: [0, 10]}, in : {x: "$$this"}}}}]
+ [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}]
}
}
],
writeConcernOptions)
.toArray();
-assert.eq(11, docsUnionWith.length);
-assert.eq(docsUnionWith[0], {a: 13});
-for (let i = 1; i < 11; i++) {
- assert.eq(docsUnionWith[i], {x: i - 1});
+assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith));
+
+{ // $documents with const objects inside $unionWith (no "coll").
+ const res = coll.aggregate(
+ [
+ {$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}},
+ {$project: {_id: 0}}
+ ],
+ writeConcernOptions)
+ .toArray();
+ assert(resultsEq([{a: 1}, {xx: 1}, {xx: 2}], res));
+}
+
+{ // $documents with const objects inside $lookup (no "coll", explicit $match).
+ const res = lookup_coll.aggregate([
+ {
+ $lookup: {
+ let: {"id_lookup": "$id_name"},
+ pipeline: [
+ {$documents: [{xx: 1}, {xx: 2}]},
+ {
+ $match:
+ {
+ $expr:
+ {
+ $eq:
+ ["$$id_lookup", "$xx"]
+ }
+ }
+ }
+ ],
+ as: "names"
+ }
+ },
+ {$match: {"names": {"$ne": []}}},
+ {$project: {_id: 0}}
+ ],
+ writeConcernOptions)
+ .toArray();
+ assert(resultsEq(
+ [
+ {id_name: 1, name: "name_1", names: [{"xx": 1}]},
+ {id_name: 2, name: "name_2", names: [{"xx": 2}]}
+ ],
+ res));
}
-// $documents with const objects inside $unionWith (no "coll").
-const res = coll.aggregate([{$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}}],
- writeConcernOptions)
- .toArray();
-assert.eq(3, res.length);
-assert.eq(res[0]["a"], 1);
-assert.eq(res[1], {xx: 1});
-assert.eq(res[2], {xx: 2});
+{ // $documents with const objects inside $lookup (no "coll", + localField/foreignField).
+ const res = lookup_coll.aggregate([
+ {
+ $lookup: {
+ localField: "id_name",
+ foreignField: "xx",
+ pipeline: [
+ {$documents: [{xx: 1}, {xx: 2}]}
+ ],
+ as: "names"
+ }
+ },
+ {$match: {"names": {"$ne": []}}},
+ {$project: {_id: 0}}
+ ],
+ writeConcernOptions)
+ .toArray();
+ assert(resultsEq(
+ [
+ {id_name: 1, name: "name_1", names: [{"xx": 1}]},
+ {id_name: 2, name: "name_2", names: [{"xx": 2}]}
+ ],
+ res));
+}
// Must fail due to misplaced $document.
assert.throwsWithCode(() => {
coll.aggregate([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}],
writeConcernOptions);
}, 40602);
+
// $unionWith must fail due to no $document
assert.throwsWithCode(() => {
coll.aggregate([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}],
writeConcernOptions);
}, 9);
+// Test that $lookup fails due to no 'from' argument and no $documents stage.
+assert.throwsWithCode(() => {
+ coll.aggregate([{$lookup: {let: {"id_lookup": "$id_name"}, as: "aa", pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}],
+ writeConcernOptions);
+}, 9);
+// Test that $lookup fails due to no 'from' argument and no pipeline field.
+assert.throwsWithCode(() => {
+ coll.aggregate([{$lookup: {let : {"id_lookup": "$id_name"}, as: "aa"}}], writeConcernOptions);
+}, 9);
// Must fail due to $documents producing array of non-objects.
assert.throwsWithCode(() => {
coll.aggregate([{$documents: [1, 2, 3]}], writeConcernOptions);
@@ -109,4 +185,4 @@ assert.throwsWithCode(() => {
assert.throwsWithCode(() => {
coll.aggregate([{$documents: {a: [1, 2, 3]}}], writeConcernOptions);
}, 5858203);
-})(); \ No newline at end of file
+})();
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 32fc1a9139f..c1850127909 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_documents.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
@@ -189,6 +190,16 @@ DocumentSourceLookUp::DocumentSourceLookUp(
initializeResolvedIntrospectionPipeline();
}
+boost::optional<BSONObj> extractDocumentsStage(const std::vector<BSONObj>& pipeline) {
+ // TODO SERVER-59628 We should be able to check for any valid data source here, not just
+ // $documents.
+ if (pipeline.size() > 0 && pipeline[0].hasField(DocumentSourceDocuments::kStageName)) {
+ return {pipeline[0]};
+ } else {
+ return boost::none;
+ }
+}
+
DocumentSourceLookUp::DocumentSourceLookUp(
NamespaceString fromNs,
std::string as,
@@ -206,19 +217,27 @@ DocumentSourceLookUp::DocumentSourceLookUp(
std::tie(_localField, _foreignField) = *localForeignFields;
// Append a BSONObj to '_resolvedPipeline' as a placeholder for the stage corresponding to
- // the local/foreignField $match.
- _resolvedPipeline.reserve(_resolvedPipeline.size() + 1);
+ // the local/foreignField $match. It must next after $documents if present.
+ auto docs = extractDocumentsStage(pipeline);
+ int offset = docs ? 1 : 0;
+ _resolvedPipeline.reserve(_resolvedPipeline.size() + 1 + offset);
+ if (docs) {
+ _resolvedPipeline.push_back(*docs);
+ }
_resolvedPipeline.push_back(BSON("$match" << BSONObj()));
_fieldMatchPipelineIdx = _resolvedPipeline.size() - 1;
+ // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match
+ _resolvedPipeline.insert(
+ _resolvedPipeline.end(), pipeline.begin() + offset, pipeline.end());
} else {
// When local/foreignFields are included, we cannot enable the cache because the $match
// is a correlated prefix that will not be detected. Here, local/foreignFields are absent,
// so we enable the cache.
_cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load());
+ // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match
+ _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
}
- // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match
- _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
_userPipeline = std::move(pipeline);
for (auto&& varElem : letVariables) {
@@ -234,6 +253,22 @@ DocumentSourceLookUp::DocumentSourceLookUp(
initializeResolvedIntrospectionPipeline();
}
+void validateLookupCollectionlessPipeline(const vector<BSONObj>& pipeline) {
+ uassert(ErrorCodes::FailedToParse,
+ "$lookup stage without explicit collection must have a pipeline with $documents as "
+ "first stage",
+ pipeline.size() > 0 &&
+ // TODO SERVER-59628 We should be able to check for any valid data source here, not
+ // just $documents.
+ !pipeline[0].getField(DocumentSourceDocuments::kStageName).eoo());
+}
+
+void validateLookupCollectionlessPipeline(const BSONElement& pipeline) {
+ uassert(ErrorCodes::FailedToParse, "must specify 'pipeline' when 'from' is empty", pipeline);
+ auto parsedPipeline = parsePipelineFromBSON(pipeline);
+ validateLookupCollectionlessPipeline(parsedPipeline);
+}
+
std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
const NamespaceString& nss, const BSONElement& spec) {
uassert(ErrorCodes::FailedToParse,
@@ -243,16 +278,19 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
auto specObj = spec.Obj();
auto fromElement = specObj["from"];
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
- fromElement);
- auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db());
+ auto pipelineElem = specObj["pipeline"];
+ NamespaceString fromNss;
+ if (!fromElement) {
+ validateLookupCollectionlessPipeline(pipelineElem);
+ fromNss = NamespaceString::makeCollectionlessAggregateNSS(nss.db());
+ } else {
+ fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db());
+ }
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "invalid $lookup namespace: " << fromNss.ns(),
fromNss.isValid());
// Recursively lite parse the nested pipeline, if one exists.
- auto pipelineElem = specObj["pipeline"];
boost::optional<LiteParsedPipeline> liteParsedPipeline;
if (pipelineElem) {
auto pipeline = parsePipelineFromBSON(pipelineElem);
@@ -1209,8 +1247,10 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
}
}
- uassert(
- ErrorCodes::FailedToParse, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
+ if (fromNs.ns().empty()) {
+ validateLookupCollectionlessPipeline(pipeline);
+ fromNs = NamespaceString::makeCollectionlessAggregateNSS(pExpCtx->ns.db());
+ }
uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty());
if (hasPipeline) {
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index fabda886e93..a9f64b14044 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -88,6 +88,7 @@ void validateUnionWithCollectionlessPipeline(
"$unionWith stage without explicit collection must have a pipeline with $documents as "
"first stage",
pipeline && pipeline->size() > 0 &&
+ // TODO SERVER-59628 replace with constraints check
!(*pipeline)[0].getField(DocumentSourceDocuments::kStageName).eoo());
}