summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_lookup.cpp
diff options
context:
space:
mode:
authorRuslan Abdulkhalikov <ruslan.abdulkhalikov@mongodb.com>2021-08-23 14:45:40 -0700
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-27 19:02:48 +0000
commitc93908fe72deaf52d1c8ac88439173269658f3d4 (patch)
tree6e01953159908550165cd780b0ac39807e727a4c /src/mongo/db/pipeline/document_source_lookup.cpp
parentc6ed35a25b11504d4c13c3aa6307ce3e69cdcd54 (diff)
downloadmongo-c93908fe72deaf52d1c8ac88439173269658f3d4.tar.gz
SERVER-58807 Allow collectionless $lookup with $documents
- Handle the missing pipeline case in lookup - Handle $lookup with $documents and localField/foreignField - Added references to SERVER-59628 - Removed "1" from dbname
Diffstat (limited to 'src/mongo/db/pipeline/document_source_lookup.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp62
1 files changed, 51 insertions, 11 deletions
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 32fc1a9139f..c1850127909 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_documents.h"
#include "mongo/db/pipeline/document_source_merge_gen.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
@@ -189,6 +190,16 @@ DocumentSourceLookUp::DocumentSourceLookUp(
initializeResolvedIntrospectionPipeline();
}
+boost::optional<BSONObj> extractDocumentsStage(const std::vector<BSONObj>& pipeline) {
+ // TODO SERVER-59628 We should be able to check for any valid data source here, not just
+ // $documents.
+ if (pipeline.size() > 0 && pipeline[0].hasField(DocumentSourceDocuments::kStageName)) {
+ return {pipeline[0]};
+ } else {
+ return boost::none;
+ }
+}
+
DocumentSourceLookUp::DocumentSourceLookUp(
NamespaceString fromNs,
std::string as,
@@ -206,19 +217,27 @@ DocumentSourceLookUp::DocumentSourceLookUp(
std::tie(_localField, _foreignField) = *localForeignFields;
// Append a BSONObj to '_resolvedPipeline' as a placeholder for the stage corresponding to
- // the local/foreignField $match.
- _resolvedPipeline.reserve(_resolvedPipeline.size() + 1);
+ // the local/foreignField $match. It must next after $documents if present.
+ auto docs = extractDocumentsStage(pipeline);
+ int offset = docs ? 1 : 0;
+ _resolvedPipeline.reserve(_resolvedPipeline.size() + 1 + offset);
+ if (docs) {
+ _resolvedPipeline.push_back(*docs);
+ }
_resolvedPipeline.push_back(BSON("$match" << BSONObj()));
_fieldMatchPipelineIdx = _resolvedPipeline.size() - 1;
+ // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match
+ _resolvedPipeline.insert(
+ _resolvedPipeline.end(), pipeline.begin() + offset, pipeline.end());
} else {
// When local/foreignFields are included, we cannot enable the cache because the $match
// is a correlated prefix that will not be detected. Here, local/foreignFields are absent,
// so we enable the cache.
_cache.emplace(internalDocumentSourceLookupCacheSizeBytes.load());
+ // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match
+ _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
}
- // Add the user pipeline to '_resolvedPipeline' after any potential view prefix and $match
- _resolvedPipeline.insert(_resolvedPipeline.end(), pipeline.begin(), pipeline.end());
_userPipeline = std::move(pipeline);
for (auto&& varElem : letVariables) {
@@ -234,6 +253,22 @@ DocumentSourceLookUp::DocumentSourceLookUp(
initializeResolvedIntrospectionPipeline();
}
+void validateLookupCollectionlessPipeline(const vector<BSONObj>& pipeline) {
+ uassert(ErrorCodes::FailedToParse,
+ "$lookup stage without explicit collection must have a pipeline with $documents as "
+ "first stage",
+ pipeline.size() > 0 &&
+ // TODO SERVER-59628 We should be able to check for any valid data source here, not
+ // just $documents.
+ !pipeline[0].getField(DocumentSourceDocuments::kStageName).eoo());
+}
+
+void validateLookupCollectionlessPipeline(const BSONElement& pipeline) {
+ uassert(ErrorCodes::FailedToParse, "must specify 'pipeline' when 'from' is empty", pipeline);
+ auto parsedPipeline = parsePipelineFromBSON(pipeline);
+ validateLookupCollectionlessPipeline(parsedPipeline);
+}
+
std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
const NamespaceString& nss, const BSONElement& spec) {
uassert(ErrorCodes::FailedToParse,
@@ -243,16 +278,19 @@ std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LitePars
auto specObj = spec.Obj();
auto fromElement = specObj["from"];
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "missing 'from' option to $lookup stage specification: " << specObj,
- fromElement);
- auto fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db());
+ auto pipelineElem = specObj["pipeline"];
+ NamespaceString fromNss;
+ if (!fromElement) {
+ validateLookupCollectionlessPipeline(pipelineElem);
+ fromNss = NamespaceString::makeCollectionlessAggregateNSS(nss.db());
+ } else {
+ fromNss = parseLookupFromAndResolveNamespace(fromElement, nss.db());
+ }
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "invalid $lookup namespace: " << fromNss.ns(),
fromNss.isValid());
// Recursively lite parse the nested pipeline, if one exists.
- auto pipelineElem = specObj["pipeline"];
boost::optional<LiteParsedPipeline> liteParsedPipeline;
if (pipelineElem) {
auto pipeline = parsePipelineFromBSON(pipelineElem);
@@ -1209,8 +1247,10 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
}
}
- uassert(
- ErrorCodes::FailedToParse, "must specify 'from' field for a $lookup", !fromNs.ns().empty());
+ if (fromNs.ns().empty()) {
+ validateLookupCollectionlessPipeline(pipeline);
+ fromNs = NamespaceString::makeCollectionlessAggregateNSS(pExpCtx->ns.db());
+ }
uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty());
if (hasPipeline) {