From 36d096e95aa193d08eb934987d6617b65cc1eadf Mon Sep 17 00:00:00 2001 From: Mihai Andrei Date: Wed, 12 Jan 2022 18:22:33 +0000 Subject: SERVER-61447 Skip expression evaluation when AccumulatorFirst/AccumulatorFirstN are full in $group and $bucketAuto --- src/mongo/db/pipeline/accumulator.h | 11 +++++++++++ src/mongo/db/pipeline/accumulator_first.cpp | 1 + src/mongo/db/pipeline/accumulator_multi.cpp | 3 +++ .../db/pipeline/document_source_bucket_auto.cpp | 12 +++++++----- src/mongo/db/pipeline/document_source_bucket_auto.h | 2 +- src/mongo/db/pipeline/document_source_group.cpp | 20 ++++++++++---------- 6 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 3a32f06c184..3a7e95f4996 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -105,6 +105,11 @@ public: /// Reset this accumulator to a fresh state, ready for a new call to startNewGroup. virtual void reset() = 0; + /// True if the accumulator needs input, false otherwise. + bool needsInput() const { + return _needsInput; + } + virtual bool isAssociative() const { return false; } @@ -152,6 +157,12 @@ protected: /// subclasses are expected to update this as necessary int _memUsageBytes = 0; + /// Member which tracks if this accumulator requires any more input values to compute its final + /// result. In general, most accumulators require all input values, however, some accumulators + /// can ignore input values under certain conditions. For example, $first can set this to + /// 'false' after it sees one value. + bool _needsInput = true; + private: ExpressionContext* _expCtx; }; diff --git a/src/mongo/db/pipeline/accumulator_first.cpp b/src/mongo/db/pipeline/accumulator_first.cpp index 5673db2540a..673546faee3 100644 --- a/src/mongo/db/pipeline/accumulator_first.cpp +++ b/src/mongo/db/pipeline/accumulator_first.cpp @@ -47,6 +47,7 @@ void AccumulatorFirst::processInternal(const Value& input, bool merging) { _haveFirst = true; _first = input; _memUsageBytes = sizeof(*this) + input.getApproximateSize() - sizeof(Value); + _needsInput = false; } } diff --git a/src/mongo/db/pipeline/accumulator_multi.cpp b/src/mongo/db/pipeline/accumulator_multi.cpp index afba2cb89b1..8affcdefb2c 100644 --- a/src/mongo/db/pipeline/accumulator_multi.cpp +++ b/src/mongo/db/pipeline/accumulator_multi.cpp @@ -365,6 +365,9 @@ void AccumulatorFirstLastN::_processValue(const Value& val) { _memUsageBytes -= _deque.front().getApproximateSize(); _deque.pop_front(); } else { + // If our deque has 'n' elements and this is $firstN, we don't need to call process + // anymore. + _needsInput = false; return; } } diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 1125589e4f9..10300e9c334 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -97,7 +97,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() { } invariant(populationResult.isEOF()); - initalizeBucketIteration(); + initializeBucketIteration(); _populated = true; } @@ -206,13 +206,15 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair& const size_t numAccumulators = _accumulatedFields.size(); for (size_t k = 0; k < numAccumulators; k++) { - bucket._accums[k]->process( - _accumulatedFields[k].expr.argument->evaluate(entry.second, &pExpCtx->variables), - false); + if (bucket._accums[k]->needsInput()) { + bucket._accums[k]->process( + _accumulatedFields[k].expr.argument->evaluate(entry.second, &pExpCtx->variables), + false); + } } } -void DocumentSourceBucketAuto::initalizeBucketIteration() { +void DocumentSourceBucketAuto::initializeBucketIteration() { // Initialize the iterator on '_sorter'. invariant(_sorter); _sortedInput.reset(_sorter->done()); diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 44bd1e3f8f0..6a85e21c642 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -133,7 +133,7 @@ private: */ GetNextResult populateSorter(); - void initalizeBucketIteration(); + void initializeBucketIteration(); /** * Computes the 'groupBy' expression value for 'doc'. diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 73f3a76d81d..c184d51e827 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -592,22 +592,22 @@ MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initi accum->startNewGroup(initializerValue); group.push_back(accum); } - } else { - for (size_t i = 0; i < group.size(); i++) { - // subtract old mem usage. New usage added back after processing. - _memoryTracker.update(_accumulatedFields[i].fieldName, - -1 * group[i]->getMemUsage()); - } } /* tickle all the accumulators for the group we found */ dassert(numAccumulators == group.size()); for (size_t i = 0; i < numAccumulators; i++) { - group[i]->process( - _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables), - _doingMerge); - _memoryTracker.update(_accumulatedFields[i].fieldName, group[i]->getMemUsage()); + // Only process the input and update the memory footprint if the current accumulator + // needs more input. + if (group[i]->needsInput()) { + const auto prevMemUsage = inserted ? 0 : group[i]->getMemUsage(); + group[i]->process(_accumulatedFields[i].expr.argument->evaluate( + rootDocument, &pExpCtx->variables), + _doingMerge); + _memoryTracker.update(_accumulatedFields[i].fieldName, + group[i]->getMemUsage() - prevMemUsage); + } } if (kDebugBuild && !storageGlobalParams.readOnly) { -- cgit v1.2.1