diff options
-rw-r--r-- | jstests/aggregation/documents.js | 104 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with.cpp | 1 |
3 files changed, 142 insertions, 25 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/documents.js index 136adb64e7c..302b850ec02 100644 --- a/jstests/aggregation/documents.js +++ b/jstests/aggregation/documents.js @@ -16,7 +16,11 @@ (function() { "use strict"; +load("jstests/aggregation/extras/utils.js"); // For resultsEq. + const dbName = jsTestName(); +// TODO SERVER-59097 - expose $documents and get rid of internal +// client here const writeConcernOptions = { writeConcern: {w: "majority"} }; @@ -36,6 +40,11 @@ const coll = currDB.documents; coll.drop(writeConcernOptions); coll.insert({a: 1}, writeConcernOptions); +const lookup_coll = currDB.lookup_coll; +lookup_coll.drop(writeConcernOptions); +for (let i = 0; i < 10; i++) { + lookup_coll.insert({id_name: i, name: "name_" + i}, writeConcernOptions); +} // $documents given an array of objects. const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray(); @@ -62,39 +71,106 @@ const docsUnionWith = { $unionWith: { pipeline: - [{$documents: {$map: {input: {$range: [0, 10]}, in : {x: "$$this"}}}}] + [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}] } } ], writeConcernOptions) .toArray(); -assert.eq(11, docsUnionWith.length); -assert.eq(docsUnionWith[0], {a: 13}); -for (let i = 1; i < 11; i++) { - assert.eq(docsUnionWith[i], {x: i - 1}); +assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith)); + +{ // $documents with const objects inside $unionWith (no "coll"). + const res = coll.aggregate( + [ + {$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}}, + {$project: {_id: 0}} + ], + writeConcernOptions) + .toArray(); + assert(resultsEq([{a: 1}, {xx: 1}, {xx: 2}], res)); +} + +{ // $documents with const objects inside $lookup (no "coll", explicit $match). + const res = lookup_coll.aggregate([ + { + $lookup: { + let: {"id_lookup": "$id_name"}, + pipeline: [ + {$documents: [{xx: 1}, {xx: 2}]}, + { + $match: + { + $expr: + { + $eq: + ["$$id_lookup", "$xx"] + } + } + } + ], + as: "names" + } + }, + {$match: {"names": {"$ne": []}}}, + {$project: {_id: 0}} + ], + writeConcernOptions) + .toArray(); + assert(resultsEq( + [ + {id_name: 1, name: "name_1", names: [{"xx": 1}]}, + {id_name: 2, name: "name_2", names: [{"xx": 2}]} + ], + res)); } -// $documents with const objects inside $unionWith (no "coll"). -const res = coll.aggregate([{$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}}], - writeConcernOptions) - .toArray(); -assert.eq(3, res.length); -assert.eq(res[0]["a"], 1); -assert.eq(res[1], {xx: 1}); -assert.eq(res[2], {xx: 2}); +{ // $documents with const objects inside $lookup (no "coll", + localField/foreignField). + const res = lookup_coll.aggregate([ + { + $lookup: { + localField: "id_name", + foreignField: "xx", + pipeline: [ + {$documents: [{xx: 1}, {xx: 2}]} + ], + as: "names" + } + }, + {$match: {"names": {"$ne": []}}}, + {$project: {_id: 0}} + ], + writeConcernOptions) + .toArray(); + assert(resultsEq( + [ + {id_name: 1, name: "name_1", names: [{"xx": 1}]}, + {id_name: 2, name: "name_2", names: [{"xx": 2}]} + ], + res)); +} // Must fail due to misplaced $document. assert.throwsWithCode(() => { coll.aggregate([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], writeConcernOptions); }, 40602); + // $unionWith must fail due to no $document assert.throwsWithCode(() => { coll.aggregate([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], writeConcernOptions); }, 9); +// Test that $lookup fails due to no 'from' argument and no $documents stage. +assert.throwsWithCode(() => { + coll.aggregate([{$lookup: {let: {"id_lookup": "$id_name"}, as: "aa", pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], + writeConcernOptions); +}, 9); +// Test that $lookup fails due to no 'from' argument and no pipeline field. +assert.throwsWithCode(() => { + coll.aggregate([{$lookup: {let : {"id_lookup": "$id_name"}, as: "aa"}}], writeConcernOptions); +}, 9); // Must fail due to $documents producing array of non-objects. assert.throwsWithCode(() => { coll.aggregate([{$documents: [1, 2, 3]}], writeConcernOptions); @@ -109,4 +185,4 @@ assert.throwsWithCode(() => { assert.throwsWithCode(() => { coll.aggregate([{$documents: {a: [1, 2, 3]}}], writeConcernOptions); }, 5858203); -})();
\ No newline at end of file +})(); diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 32fc1a9139f..c1850127909 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -42,6 +42,7 @@ #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_documents.h" #include "mongo/db/pipeline/document_source_merge_gen.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" @@ -189,6 +190,16 @@ DocumentSourceLookUp::DocumentSourceLookUp( initializeResolvedIntrospectionPipeline(); } +boost::optional<BSONObj> extractDocumentsStage(const std::vector<BSONObj>& pipeline) { + // TODO SERVER-59628 We should be able to check for any valid data source here, not just + // $documents. + if (pipeline.size() > 0 && pipeline[0].hasField(DocumentSourceDocuments::kStageName)) { + return {pipeline[0]}; + } else { + return boost::none; + } +} + DocumentSourceLookUp::DocumentSourceLookUp( NamespaceString fromNs, std::string as, @@ -206,19 +217,27 @@ DocumentSourceLookUp::DocumentSourceLookUp( std::tie(_localField, _foreignField) = *localForeignFields; // Append a BSONObj to '_resolvedPipeline' as a placeholder for the stage corresponding to - // the local/foreignField $match. - _resolvedPipeline.reserve(_resolvedPipeline.size() + 1); + // the local/foreignField $match. It must next after $documents if present. + auto docs = extractDocumentsStage(pipeline); + int offset = docs ? 1 : 0; + _resolvedPipeline.reserve(_resolvedPipeline.size() + 1 + offset); + if (docs) { + _resolvedPipeline.push_back(*docs); + } _resolvedPipeline.push_back(BSON("$match" << BSONObj())); _fieldMatchPipelineIdx = _resolvedPipeline.size() - 1; + // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match + _resolvedPipeline.insert( + _resolvedPipeline.end(), pipeline.begin() + offset, pipeline.end()); } else { // When local/foreignFields are included, we cannot enable the cache because the $match // is a correlated prefix that will not be detected. Here, local/foreignFields are absent, // so we enable the cache. _cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load()); + // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match + _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end()); } - // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match - _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end()); _userPipeline = std::move(pipeline); for (auto&& varElem : letVariables) { @@ -234,6 +253,22 @@ DocumentSourceLookUp::DocumentSourceLookUp( initializeResolvedIntrospectionPipeline(); } +void validateLookupCollectionlessPipeline(const vector<BSONObj>& pipeline) { + uassert(ErrorCodes::FailedToParse, + "$lookup stage without explicit collection must have a pipeline with $documents as " + "first stage", + pipeline.size() > 0 && + // TODO SERVER-59628 We should be able to check for any valid data source here, not + // just $documents. + !pipeline[0].getField(DocumentSourceDocuments::kStageName).eoo()); +} + +void validateLookupCollectionlessPipeline(const BSONElement& pipeline) { + uassert(ErrorCodes::FailedToParse, "must specify 'pipeline' when 'from' is empty", pipeline); + auto parsedPipeline = parsePipelineFromBSON(pipeline); + validateLookupCollectionlessPipeline(parsedPipeline); +} + std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse( const NamespaceString& nss, const BSONElement& spec) { uassert(ErrorCodes::FailedToParse, @@ -243,16 +278,19 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars auto specObj = spec.Obj(); auto fromElement = specObj["from"]; - uassert(ErrorCodes::FailedToParse, - str::stream() << "missing 'from' option to $lookup stage specification: " << specObj, - fromElement); - auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db()); + auto pipelineElem = specObj["pipeline"]; + NamespaceString fromNss; + if (!fromElement) { + validateLookupCollectionlessPipeline(pipelineElem); + fromNss = NamespaceString::makeCollectionlessAggregateNSS(nss.db()); + } else { + fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db()); + } uassert(ErrorCodes::InvalidNamespace, str::stream() << "invalid $lookup namespace: " << fromNss.ns(), fromNss.isValid()); // Recursively lite parse the nested pipeline, if one exists. - auto pipelineElem = specObj["pipeline"]; boost::optional<LiteParsedPipeline> liteParsedPipeline; if (pipelineElem) { auto pipeline = parsePipelineFromBSON(pipelineElem); @@ -1209,8 +1247,10 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson( } } - uassert( - ErrorCodes::FailedToParse, "must specify 'from' field for a $lookup", !fromNs.ns().empty()); + if (fromNs.ns().empty()) { + validateLookupCollectionlessPipeline(pipeline); + fromNs = NamespaceString::makeCollectionlessAggregateNSS(pExpCtx->ns.db()); + } uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty()); if (hasPipeline) { diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index fabda886e93..a9f64b14044 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -88,6 +88,7 @@ void validateUnionWithCollectionlessPipeline( "$unionWith stage without explicit collection must have a pipeline with $documents as " "first stage", pipeline && pipeline->size() > 0 && + // TODO SERVER-59628 replace with constraints check !(*pipeline)[0].getField(DocumentSourceDocuments::kStageName).eoo()); } |