diff options
author | ruslan.abdulkhalikov <ruslan.abdulkhalikov@mongodb.com> | 2022-05-17 00:48:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-17 02:37:50 +0000 |
commit | 8d8d5077f1ab5d161919b3a3131ce10c8a96b5f9 (patch) | |
tree | 4da016b3566b8bb1620ac4fecd8b3050da0414d4 | |
parent | 007846ad9c22495af276034c6d855572947e2742 (diff) | |
download | mongo-8d8d5077f1ab5d161919b3a3131ce10c8a96b5f9.tar.gz |
SERVER-65884 $lookup from ts optimize $sequentialCache
(cherry picked from commit a672dcd5658bdb7117de384e50e1747e610a5684)
3 files changed, 121 insertions, 0 deletions
diff --git a/jstests/core/timeseries/timeseries_lookup.js b/jstests/core/timeseries/timeseries_lookup.js index c173764a591..98801b63e43 100644 --- a/jstests/core/timeseries/timeseries_lookup.js +++ b/jstests/core/timeseries/timeseries_lookup.js @@ -84,6 +84,99 @@ TimeseriesTest.run((insert) => { assert.eq({host: "host_" + i, matchedB: entryCountPerHost[i]}, results[i], results); } + // Equality Match With Let (uncorrelated) + // Make sure injected $sequentialDocumentCache (right after unpack bucket stage) + // in the inner pipeline is removed. + results = collA.aggregate([ + { + $lookup: { + from: collB.getName(), + let: {"outer_hostname": "$tags.hostname"}, + pipeline: [ + // $match will be pushed before unpack bucket stage + {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}}, + ], + as: "matchedB" + } + }, { + $project: { + _id: 0, + host: "$tags.hostname", + matchedB: { + $size: "$matchedB" + } + } + }, + {$sort: {host: 1}} + ]).toArray(); + assert.eq(numHosts, results.length, results); + for (let i = 0; i < numHosts; i++) { + const matched = i === 0 ? numDocs : 0; + assert.eq({host: "host_" + i, matchedB: matched}, results[i], results); + } + + // Equality Match With Let (uncorrelated) + // Make sure injected $sequentialDocumentCache in the inner pipeline is removed. + // $sequentialDocumentCache is not located right after unpack bucket stage. + results = collA.aggregate([ + { + $lookup: { + from: collB.getName(), + let: {"outer_hostname": "$tags.hostname"}, + pipeline: [ + {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}}, + {$set: {foo: {$const: 123}}}, // uncorrelated + ], + as: "matchedB" + } + }, { + $project: { + _id: 0, + host: "$tags.hostname", + matchedB: { + $size: "$matchedB" + } + } + }, + {$sort: {host: 1}} + ]).toArray(); + assert.eq(numHosts, results.length, results); + for (let i = 0; i < numHosts; i++) { + const matched = i === 0 ? numDocs : 0; + assert.eq({host: "host_" + i, matchedB: matched}, results[i], results); + } + + // Equality Match With Let (correlated, no $match re-order) + // Make sure injected $sequentialDocumentCache in the inner pipeline is removed. + // $sequentialDocumentCache is located at the very end of pipeline. + results = collA.aggregate([ + { + $lookup: { + from: collB.getName(), + let: {"outer_hostname": "$tags.hostname"}, + pipeline: [ + {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}}, + {$set: {foo: "$$outer_hostname"}}, // correlated + ], + as: "matchedB" + } + }, { + $project: { + _id: 0, + host: "$tags.hostname", + matchedB: { + $size: "$matchedB" + } + } + }, + {$sort: {host: 1}} + ]).toArray(); + assert.eq(numHosts, results.length, results); + for (let i = 0; i < numHosts; i++) { + const matched = i === 0 ? numDocs : 0; + assert.eq({host: "host_" + i, matchedB: matched}, results[i], results); + } + // Unequal joins results = collA.aggregate([ { diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 5040fdc2445..875cbf6b8a6 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -54,6 +54,7 @@ #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/document_source_sample.h" +#include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" @@ -838,6 +839,15 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta return true; } +// Find $sequentialDocumentCache in the rest of the pipeline. +Pipeline::SourceContainer::iterator findSequentialDocumentCache( + Pipeline::SourceContainer::iterator start, Pipeline::SourceContainer::iterator end) { + while (start != end && !dynamic_cast<DocumentSourceSequentialDocumentCache*>(start->get())) { + start = std::next(start); + } + return start; +} + Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); @@ -933,6 +943,18 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi } Pipeline::optimizeEndOfPipeline(itr, container); + + // $sequentialDocumentCache is full pipeline context aware. The call to + // optimizeEndOfPipeline above isolates part of the pipeline and $sequentialDocumentCache + // optimization applies incorrectly. The second call to + // $sequentialDocumentCache->optimizeAt() is no-op, so it has to be forced. + auto cache = findSequentialDocumentCache(std::next(itr), container->end()); + if (cache != container->end()) { + auto cacheDocSource = + dynamic_cast<DocumentSourceSequentialDocumentCache*>(cache->get()); + cacheDocSource->forceOptimizeAt(cache, container); + } + if (std::next(itr) == container->end()) { return container->end(); } else { diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index 792b3b56a53..2b27effb430 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -96,6 +96,12 @@ public: return newStage; } + Pipeline::SourceContainer::iterator forceOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + _hasOptimizedPos = false; + return doOptimizeAt(itr, container); + } + protected: GetNextResult doGetNext() final; Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, |