diff options
author | ruslan.abdulkhalikov <ruslan.abdulkhalikov@mongodb.com> | 2022-05-17 00:48:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-17 02:02:30 +0000 |
commit | 252235fda4d45e85db342bb6437b1587b980b1ed (patch) | |
tree | 7622b604fa7a79579c3139f9f98dd97e2d2e11e0 | |
parent | 4ad4646e089410bee0926806d10f809baab9fbe9 (diff) | |
download | mongo-252235fda4d45e85db342bb6437b1587b980b1ed.tar.gz |
SERVER-65884 $lookup from ts optimize $sequentialCache
(cherry picked from commit a672dcd5658bdb7117de384e50e1747e610a5684)
3 files changed, 121 insertions, 0 deletions
diff --git a/jstests/core/timeseries/timeseries_lookup.js b/jstests/core/timeseries/timeseries_lookup.js index 75a80a48177..d95bf137f5a 100644 --- a/jstests/core/timeseries/timeseries_lookup.js +++ b/jstests/core/timeseries/timeseries_lookup.js @@ -85,6 +85,99 @@ TimeseriesTest.run((insert) => { assert.eq({host: "host_" + i, matchedB: entryCountPerHost[i]}, results[i], results); } + // Equality Match With Let (uncorrelated) + // Make sure injected $sequentialDocumentCache (right after unpack bucket stage) + // in the inner pipeline is removed. + results = collA.aggregate([ + { + $lookup: { + from: collB.getName(), + let: {"outer_hostname": "$tags.hostname"}, + pipeline: [ + // $match will be pushed before unpack bucket stage + {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}}, + ], + as: "matchedB" + } + }, { + $project: { + _id: 0, + host: "$tags.hostname", + matchedB: { + $size: "$matchedB" + } + } + }, + {$sort: {host: 1}} + ]).toArray(); + assert.eq(numHosts, results.length, results); + for (let i = 0; i < numHosts; i++) { + const matched = i === 0 ? numDocs : 0; + assert.eq({host: "host_" + i, matchedB: matched}, results[i], results); + } + + // Equality Match With Let (uncorrelated) + // Make sure injected $sequentialDocumentCache in the inner pipeline is removed. + // $sequentialDocumentCache is not located right after unpack bucket stage. + results = collA.aggregate([ + { + $lookup: { + from: collB.getName(), + let: {"outer_hostname": "$tags.hostname"}, + pipeline: [ + {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}}, + {$set: {foo: {$const: 123}}}, // uncorrelated + ], + as: "matchedB" + } + }, { + $project: { + _id: 0, + host: "$tags.hostname", + matchedB: { + $size: "$matchedB" + } + } + }, + {$sort: {host: 1}} + ]).toArray(); + assert.eq(numHosts, results.length, results); + for (let i = 0; i < numHosts; i++) { + const matched = i === 0 ? numDocs : 0; + assert.eq({host: "host_" + i, matchedB: matched}, results[i], results); + } + + // Equality Match With Let (correlated, no $match re-order) + // Make sure injected $sequentialDocumentCache in the inner pipeline is removed. + // $sequentialDocumentCache is located at the very end of pipeline. + results = collA.aggregate([ + { + $lookup: { + from: collB.getName(), + let: {"outer_hostname": "$tags.hostname"}, + pipeline: [ + {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}}, + {$set: {foo: "$$outer_hostname"}}, // correlated + ], + as: "matchedB" + } + }, { + $project: { + _id: 0, + host: "$tags.hostname", + matchedB: { + $size: "$matchedB" + } + } + }, + {$sort: {host: 1}} + ]).toArray(); + assert.eq(numHosts, results.length, results); + for (let i = 0; i < numHosts; i++) { + const matched = i === 0 ? numDocs : 0; + assert.eq({host: "host_" + i, matchedB: matched}, results[i], results); + } + // Unequal joins results = collA.aggregate([ { diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 272d7a1a80c..f3a96903a8e 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -53,6 +53,7 @@ #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_sample.h" +#include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" @@ -980,6 +981,15 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta tryInsertBucketLevelSortAndGroup(AccumulatorDocumentsNeeded::kLastDocument); } +// Find $sequentialDocumentCache in the rest of the pipeline. +Pipeline::SourceContainer::iterator findSequentialDocumentCache( + Pipeline::SourceContainer::iterator start, Pipeline::SourceContainer::iterator end) { + while (start != end && !dynamic_cast<DocumentSourceSequentialDocumentCache*>(start->get())) { + start = std::next(start); + } + return start; +} + Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); @@ -1075,6 +1085,18 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi } Pipeline::optimizeEndOfPipeline(itr, container); + + // $sequentialDocumentCache is full pipeline context aware. The call to + // optimizeEndOfPipeline above isolates part of the pipeline and $sequentialDocumentCache + // optimization applies incorrectly. The second call to + // $sequentialDocumentCache->optimizeAt() is no-op, so it has to be forced. + auto cache = findSequentialDocumentCache(std::next(itr), container->end()); + if (cache != container->end()) { + auto cacheDocSource = + dynamic_cast<DocumentSourceSequentialDocumentCache*>(cache->get()); + cacheDocSource->forceOptimizeAt(cache, container); + } + if (std::next(itr) == container->end()) { return container->end(); } else { diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index 4e81390d4ef..743394ac853 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -97,6 +97,12 @@ public: return newStage; } + Pipeline::SourceContainer::iterator forceOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + _hasOptimizedPos = false; + return doOptimizeAt(itr, container); + } + protected: GetNextResult doGetNext() final; Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, |