summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@10gen.com>2022-01-12 18:22:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-12 19:11:54 +0000
commit36d096e95aa193d08eb934987d6617b65cc1eadf (patch)
treeeff52e4ca344524d1dd15bd374b23f06de27a472
parentc7b4557516fee750c4e743d5d67b3281bdcccfb1 (diff)
downloadmongo-36d096e95aa193d08eb934987d6617b65cc1eadf.tar.gz
SERVER-61447 Skip expression evaluation when AccumulatorFirst/AccumulatorFirstN are full in $group and $bucketAuto
-rw-r--r--src/mongo/db/pipeline/accumulator.h11
-rw-r--r--src/mongo/db/pipeline/accumulator_first.cpp1
-rw-r--r--src/mongo/db/pipeline/accumulator_multi.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h2
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp20
6 files changed, 33 insertions, 16 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index 3a32f06c184..3a7e95f4996 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -105,6 +105,11 @@ public:
/// Reset this accumulator to a fresh state, ready for a new call to startNewGroup.
virtual void reset() = 0;
+ /// True if the accumulator needs input, false otherwise.
+ bool needsInput() const {
+ return _needsInput;
+ }
+
virtual bool isAssociative() const {
return false;
}
@@ -152,6 +157,12 @@ protected:
/// subclasses are expected to update this as necessary
int _memUsageBytes = 0;
+ /// Member which tracks if this accumulator requires any more input values to compute its final
+ /// result. In general, most accumulators require all input values, however, some accumulators
+ /// can ignore input values under certain conditions. For example, $first can set this to
+ /// 'false' after it sees one value.
+ bool _needsInput = true;
+
private:
ExpressionContext* _expCtx;
};
diff --git a/src/mongo/db/pipeline/accumulator_first.cpp b/src/mongo/db/pipeline/accumulator_first.cpp
index 5673db2540a..673546faee3 100644
--- a/src/mongo/db/pipeline/accumulator_first.cpp
+++ b/src/mongo/db/pipeline/accumulator_first.cpp
@@ -47,6 +47,7 @@ void AccumulatorFirst::processInternal(const Value& input, bool merging) {
_haveFirst = true;
_first = input;
_memUsageBytes = sizeof(*this) + input.getApproximateSize() - sizeof(Value);
+ _needsInput = false;
}
}
diff --git a/src/mongo/db/pipeline/accumulator_multi.cpp b/src/mongo/db/pipeline/accumulator_multi.cpp
index afba2cb89b1..8affcdefb2c 100644
--- a/src/mongo/db/pipeline/accumulator_multi.cpp
+++ b/src/mongo/db/pipeline/accumulator_multi.cpp
@@ -365,6 +365,9 @@ void AccumulatorFirstLastN::_processValue(const Value& val) {
_memUsageBytes -= _deque.front().getApproximateSize();
_deque.pop_front();
} else {
+ // If our deque has 'n' elements and this is $firstN, we don't need to call process
+ // anymore.
+ _needsInput = false;
return;
}
}
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 1125589e4f9..10300e9c334 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -97,7 +97,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() {
}
invariant(populationResult.isEOF());
- initalizeBucketIteration();
+ initializeBucketIteration();
_populated = true;
}
@@ -206,13 +206,15 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>&
const size_t numAccumulators = _accumulatedFields.size();
for (size_t k = 0; k < numAccumulators; k++) {
- bucket._accums[k]->process(
- _accumulatedFields[k].expr.argument->evaluate(entry.second, &pExpCtx->variables),
- false);
+ if (bucket._accums[k]->needsInput()) {
+ bucket._accums[k]->process(
+ _accumulatedFields[k].expr.argument->evaluate(entry.second, &pExpCtx->variables),
+ false);
+ }
}
}
-void DocumentSourceBucketAuto::initalizeBucketIteration() {
+void DocumentSourceBucketAuto::initializeBucketIteration() {
// Initialize the iterator on '_sorter'.
invariant(_sorter);
_sortedInput.reset(_sorter->done());
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index 44bd1e3f8f0..6a85e21c642 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -133,7 +133,7 @@ private:
*/
GetNextResult populateSorter();
- void initalizeBucketIteration();
+ void initializeBucketIteration();
/**
* Computes the 'groupBy' expression value for 'doc'.
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 73f3a76d81d..c184d51e827 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -592,22 +592,22 @@ MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initi
accum->startNewGroup(initializerValue);
group.push_back(accum);
}
- } else {
- for (size_t i = 0; i < group.size(); i++) {
- // subtract old mem usage. New usage added back after processing.
- _memoryTracker.update(_accumulatedFields[i].fieldName,
- -1 * group[i]->getMemUsage());
- }
}
/* tickle all the accumulators for the group we found */
dassert(numAccumulators == group.size());
for (size_t i = 0; i < numAccumulators; i++) {
- group[i]->process(
- _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables),
- _doingMerge);
- _memoryTracker.update(_accumulatedFields[i].fieldName, group[i]->getMemUsage());
+ // Only process the input and update the memory footprint if the current accumulator
+ // needs more input.
+ if (group[i]->needsInput()) {
+ const auto prevMemUsage = inserted ? 0 : group[i]->getMemUsage();
+ group[i]->process(_accumulatedFields[i].expr.argument->evaluate(
+ rootDocument, &pExpCtx->variables),
+ _doingMerge);
+ _memoryTracker.update(_accumulatedFields[i].fieldName,
+ group[i]->getMemUsage() - prevMemUsage);
+ }
}
if (kDebugBuild && !storageGlobalParams.readOnly) {