summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorruslan.abdulkhalikov <ruslan.abdulkhalikov@mongodb.com>2022-05-17 00:48:27 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-17 02:37:50 +0000
commit8d8d5077f1ab5d161919b3a3131ce10c8a96b5f9 (patch)
tree4da016b3566b8bb1620ac4fecd8b3050da0414d4
parent007846ad9c22495af276034c6d855572947e2742 (diff)
downloadmongo-8d8d5077f1ab5d161919b3a3131ce10c8a96b5f9.tar.gz
SERVER-65884 $lookup from ts optimize $sequentialCache
(cherry picked from commit a672dcd5658bdb7117de384e50e1747e610a5684)
-rw-r--r--jstests/core/timeseries/timeseries_lookup.js93
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h6
3 files changed, 121 insertions, 0 deletions
diff --git a/jstests/core/timeseries/timeseries_lookup.js b/jstests/core/timeseries/timeseries_lookup.js
index c173764a591..98801b63e43 100644
--- a/jstests/core/timeseries/timeseries_lookup.js
+++ b/jstests/core/timeseries/timeseries_lookup.js
@@ -84,6 +84,99 @@ TimeseriesTest.run((insert) => {
assert.eq({host: "host_" + i, matchedB: entryCountPerHost[i]}, results[i], results);
}
+ // Equality Match With Let (uncorrelated)
+ // Make sure injected $sequentialDocumentCache (right after unpack bucket stage)
+ // in the inner pipeline is removed.
+ results = collA.aggregate([
+ {
+ $lookup: {
+ from: collB.getName(),
+ let: {"outer_hostname": "$tags.hostname"},
+ pipeline: [
+ // $match will be pushed before unpack bucket stage
+ {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}},
+ ],
+ as: "matchedB"
+ }
+ }, {
+ $project: {
+ _id: 0,
+ host: "$tags.hostname",
+ matchedB: {
+ $size: "$matchedB"
+ }
+ }
+ },
+ {$sort: {host: 1}}
+ ]).toArray();
+ assert.eq(numHosts, results.length, results);
+ for (let i = 0; i < numHosts; i++) {
+ const matched = i === 0 ? numDocs : 0;
+ assert.eq({host: "host_" + i, matchedB: matched}, results[i], results);
+ }
+
+ // Equality Match With Let (uncorrelated)
+ // Make sure injected $sequentialDocumentCache in the inner pipeline is removed.
+ // $sequentialDocumentCache is not located right after unpack bucket stage.
+ results = collA.aggregate([
+ {
+ $lookup: {
+ from: collB.getName(),
+ let: {"outer_hostname": "$tags.hostname"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}},
+ {$set: {foo: {$const: 123}}}, // uncorrelated
+ ],
+ as: "matchedB"
+ }
+ }, {
+ $project: {
+ _id: 0,
+ host: "$tags.hostname",
+ matchedB: {
+ $size: "$matchedB"
+ }
+ }
+ },
+ {$sort: {host: 1}}
+ ]).toArray();
+ assert.eq(numHosts, results.length, results);
+ for (let i = 0; i < numHosts; i++) {
+ const matched = i === 0 ? numDocs : 0;
+ assert.eq({host: "host_" + i, matchedB: matched}, results[i], results);
+ }
+
+ // Equality Match With Let (correlated, no $match re-order)
+ // Make sure injected $sequentialDocumentCache in the inner pipeline is removed.
+ // $sequentialDocumentCache is located at the very end of pipeline.
+ results = collA.aggregate([
+ {
+ $lookup: {
+ from: collB.getName(),
+ let: {"outer_hostname": "$tags.hostname"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$$outer_hostname", hosts[0].tags.hostname]}}},
+ {$set: {foo: "$$outer_hostname"}}, // correlated
+ ],
+ as: "matchedB"
+ }
+ }, {
+ $project: {
+ _id: 0,
+ host: "$tags.hostname",
+ matchedB: {
+ $size: "$matchedB"
+ }
+ }
+ },
+ {$sort: {host: 1}}
+ ]).toArray();
+ assert.eq(numHosts, results.length, results);
+ for (let i = 0; i < numHosts; i++) {
+ const matched = i === 0 ? numDocs : 0;
+ assert.eq({host: "host_" + i, matchedB: matched}, results[i], results);
+ }
+
// Unequal joins
results = collA.aggregate([
{
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 5040fdc2445..875cbf6b8a6 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/document_source_sample.h"
+#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -838,6 +839,15 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
return true;
}
+// Find $sequentialDocumentCache in the rest of the pipeline.
+Pipeline::SourceContainer::iterator findSequentialDocumentCache(
+ Pipeline::SourceContainer::iterator start, Pipeline::SourceContainer::iterator end) {
+ while (start != end && !dynamic_cast<DocumentSourceSequentialDocumentCache*>(start->get())) {
+ start = std::next(start);
+ }
+ return start;
+}
+
Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
@@ -933,6 +943,18 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi
}
Pipeline::optimizeEndOfPipeline(itr, container);
+
+ // $sequentialDocumentCache is full pipeline context aware. The call to
+ // optimizeEndOfPipeline above isolates part of the pipeline and $sequentialDocumentCache
+ // optimization applies incorrectly. The second call to
+ // $sequentialDocumentCache->optimizeAt() is no-op, so it has to be forced.
+ auto cache = findSequentialDocumentCache(std::next(itr), container->end());
+ if (cache != container->end()) {
+ auto cacheDocSource =
+ dynamic_cast<DocumentSourceSequentialDocumentCache*>(cache->get());
+ cacheDocSource->forceOptimizeAt(cache, container);
+ }
+
if (std::next(itr) == container->end()) {
return container->end();
} else {
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index 792b3b56a53..2b27effb430 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -96,6 +96,12 @@ public:
return newStage;
}
+ Pipeline::SourceContainer::iterator forceOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ _hasOptimizedPos = false;
+ return doOptimizeAt(itr, container);
+ }
+
protected:
GetNextResult doGetNext() final;
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,