diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2021-03-31 14:35:10 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-30 14:22:03 +0000 |
commit | f36132ec219dd8cfdab420403f10d18666249947 (patch) | |
tree | b670698b942e7dbf84e01ae9e1afd061680ec457 | |
parent | 3295126e8a081ca57af1a07c6c582fac7a825efd (diff) | |
download | mongo-f36132ec219dd8cfdab420403f10d18666249947.tar.gz |
SERVER-54664 Add per-function execution stats to $setWindowFields stage
10 files changed, 382 insertions, 88 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/explain.js b/jstests/aggregation/sources/setWindowFields/explain.js new file mode 100644 index 00000000000..fbae93889d8 --- /dev/null +++ b/jstests/aggregation/sources/setWindowFields/explain.js @@ -0,0 +1,149 @@ +/** + * Tests that $setWindowFields stage reports memory footprint per function when explain is run + * with verbosities "executionStats" and "allPlansExecution". + * + * @tags: [assumes_against_mongod_not_mongos] + */ +(function() { +"use strict"; + +load("jstests/libs/analyze_plan.js"); // For getAggPlanStages(). + +const coll = db[jsTestName()]; +coll.drop(); +const bigStr = Array(1025).toString(); // 1KB of ',' +const nDocs = 1000; +const nPartitions = 50; +const docSize = 8 + 8 + 1024; + +const featureEnabled = + assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1})) + .featureFlagWindowFunctions.value; +if (!featureEnabled) { + jsTestLog("Skipping test because the window function feature flag is disabled"); + return; +} + +let bulk = coll.initializeUnorderedBulkOp(); +for (let i = 1; i <= nDocs; i++) { + bulk.insert({_id: i, key: i % nPartitions, bigStr: bigStr}); +} +assert.commandWorked(bulk.execute()); + +/** + * Checks that the execution stats in the explain output for a $setWindowFields stage are as + * expected. + * - 'stages' is an array of the explain output of $setWindowFields stages. + * - 'expectedFunctionMemUsages' is used to check the memory footprint stats for each function. + * - 'verbosity' indicates the explain verbosity used. + */ +function checkExplainResult(pipeline, expectedFunctionMemUsages, verbosity) { + const stages = + getAggPlanStages(coll.explain(verbosity).aggregate(pipeline), "$_internalSetWindowFields"); + for (let stage of stages) { + assert(stage.hasOwnProperty("$_internalSetWindowFields"), stage); + + if (verbosity === "executionStats" || verbosity === "allPlansExecution") { + assert(stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage); + const maxFunctionMemUsages = stage["maxFunctionMemoryUsageBytes"]; + for (let field of Object.keys(maxFunctionMemUsages)) { + // Ensures that the expected functions are all included and the corresponding + // memory usage is in a reasonable range. + if (expectedFunctionMemUsages.hasOwnProperty(field)) { + assert.gt(maxFunctionMemUsages[field], + expectedFunctionMemUsages[field], + "mismatch for function '" + field + "': " + tojson(stage)); + assert.lt(maxFunctionMemUsages[field], + 2 * expectedFunctionMemUsages[field], + "mismatch for function '" + field + "': " + tojson(stage)); + } + } + } else { + assert(!stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage); + } + } +} + +(function testQueryPlannerVerbosity() { + const pipeline = [ + { + $setWindowFields: + {output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}} + }, + ]; + const stages = getAggPlanStages(coll.explain("queryPlanner").aggregate(pipeline), + "$_internalSetWindowFields"); + checkExplainResult(stages, {}, "queryPlanner"); +})(); + +(function testUnboundedMemUsage() { + let pipeline = [ + { + $setWindowFields: + {output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}} + }, + ]; + + // The $setWindowFields stage "streams" one partition at a time, so there's only one instance of + // each function. For the default [unbounded, unbounded] window type, each function uses memory + // usage comparable to it's $group counterpart. + let expectedFunctionMemUsages = { + count: 60, + push: nDocs * 1024, + set: 1024, + }; + + checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution"); + + // Test that the memory footprint is reduced with partitioning. + pipeline = [ + { + $setWindowFields: { + partitionBy: "$key", + output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}} + } + }, + ]; + expectedFunctionMemUsages = { + count: 60, + push: (nDocs / nPartitions) * 1024, + set: 1024, + }; + + checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution"); +})(); + +(function testSlidingWindowMemUsage() { + const windowSize = 10; + let pipeline = [ + { + $setWindowFields: { + sortBy: {_id: 1}, + output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}} + } + }, + ]; + const expectedFunctionMemUsages = { + runningSum: windowSize * 16 + + 160, // 10x64-bit integer values per window, and 160 for the $sum state. + }; + + checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution"); + + // Adding partitioning doesn't change the peak memory usage. + pipeline = [ + { + $setWindowFields: { + partitionBy: "$key", + sortBy: {_id: 1}, + output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}} + } + }, + ]; + checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats"); + checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution"); +})(); +}()); diff --git a/jstests/aggregation/sources/setWindowFields/memory_limit.js b/jstests/aggregation/sources/setWindowFields/memory_limit.js index 6e705beaa4b..f960780d06b 100644 --- a/jstests/aggregation/sources/setWindowFields/memory_limit.js +++ b/jstests/aggregation/sources/setWindowFields/memory_limit.js @@ -58,6 +58,7 @@ assert.commandWorked(coll.runCommand({ }], cursor: {} })); + // Test that the query fails with a window function that stores documents. assert.commandFailedWithCode(coll.runCommand({ aggregate: coll.getName(), diff --git a/jstests/noPassthrough/explain_group_stage_exec_stats.js b/jstests/noPassthrough/explain_group_stage_exec_stats.js index 55be2c77dc2..257d9676baa 100644 --- a/jstests/noPassthrough/explain_group_stage_exec_stats.js +++ b/jstests/noPassthrough/explain_group_stage_exec_stats.js @@ -45,7 +45,7 @@ const expectedAccumMemUsages = { function checkGroupStages(stages, expectedAccumMemUsages, isExecExplain, shouldSpillToDisk) { // Tracks the memory usage per accumulator in total as 'stages' passed in could be the explain // output across a cluster. - let totalAccmMemoryUsageBytes = 0; + let totalAccumMemoryUsageBytes = 0; for (let stage of stages) { assert(stage.hasOwnProperty("$group"), stage); @@ -54,7 +54,7 @@ function checkGroupStages(stages, expectedAccumMemUsages, isExecExplain, shouldS assert(stage.hasOwnProperty("maxAccumulatorMemoryUsageBytes"), stage); const maxAccmMemUsages = stage["maxAccumulatorMemoryUsageBytes"]; for (let field of Object.keys(maxAccmMemUsages)) { - totalAccmMemoryUsageBytes += maxAccmMemUsages[field]; + totalAccumMemoryUsageBytes += maxAccmMemUsages[field]; // Ensures that the expected accumulators are all included and the corresponding // memory usage is in a reasonable range. Note that in debug mode, data will be @@ -69,8 +69,11 @@ function checkGroupStages(stages, expectedAccumMemUsages, isExecExplain, shouldS } } + // Add some wiggle room to the total memory used compared to the limit parameter since the check + // for spilling to disk happens after each document is processed. if (shouldSpillToDisk) - assert.gt(maxMemoryLimitForGroupStage, totalAccmMemoryUsageBytes); + assert.gt( + maxMemoryLimitForGroupStage + 3 * 1024, totalAccumMemoryUsageBytes, tojson(stages)); } let groupStages = getAggPlanStages(coll.explain("executionStats").aggregate(pipeline), "$group"); diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 5326cf864af..c4ca7bc932f 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -133,39 +133,37 @@ const char* DocumentSourceGroup::getSourceName() const { return kStageName.rawData(); } -bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory) { - if (!_memoryTracker.allowDiskUse && - (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes)) { - _memoryTracker.memoryUsageBytes -= saveMemory(); +bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory() { + if (!_memoryTracker._allowDiskUse && + (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes)) { + freeMemory(); } - if (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes) { + if (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes) { uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, "Exceeded memory limit for $group, but didn't allow external sort." " Pass allowDiskUse:true to opt in.", - _memoryTracker.allowDiskUse); - _memoryTracker.memoryUsageBytes = 0; + _memoryTracker._allowDiskUse); + _memoryTracker.set(0); return true; } return false; } -int DocumentSourceGroup::freeMemory() { +void DocumentSourceGroup::freeMemory() { invariant(_groups); - int totalMemorySaved = 0; for (auto&& group : *_groups) { for (size_t i = 0; i < group.second.size(); i++) { - auto prevMemUsage = group.second[i]->getMemUsage(); + // Subtract the current usage. + _memoryTracker.update(_accumulatedFields[i].fieldName, + -1 * group.second[i]->getMemUsage()); + group.second[i]->reduceMemoryConsumptionIfAble(); - auto memorySaved = prevMemUsage - group.second[i]->getMemUsage(); // Update the memory usage for this AccumulationStatement. - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes -= memorySaved; - // Update the memory usage for this group. - totalMemorySaved += memorySaved; + _memoryTracker.update(_accumulatedFields[i].fieldName, group.second[i]->getMemUsage()); } } - return totalMemorySaved; } DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() { @@ -306,16 +304,12 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> MutableDocument out; out[getSourceName()] = Value(insides.freeze()); - if (explain >= ExplainOptions::Verbosity::kExecStats) { + if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) { MutableDocument md; - invariant(_accumulatedFields.size() == _memoryTracker.accumStatementMemoryBytes.size()); for (size_t i = 0; i < _accumulatedFields.size(); i++) { - md[_accumulatedFields[i].fieldName] = _stats.usedDisk - ? Value(static_cast<long long>( - _memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes)) - : Value(static_cast<long long>( - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes)); + md[_accumulatedFields[i].fieldName] = Value(static_cast<long long>( + _memoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes())); } out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue()); @@ -390,9 +384,8 @@ intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create( groupStage->setIdExpression(groupByExpression); for (auto&& statement : accumulationStatements) { groupStage->addAccumulator(statement); + groupStage->_memoryTracker.set(statement.fieldName, 0); } - groupStage->_memoryTracker.accumStatementMemoryBytes.resize(accumulationStatements.size(), - {0, 0}); return groupStage; } @@ -498,10 +491,9 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson( // Any other field will be treated as an accumulator specification. groupStage->addAccumulator( AccumulationStatement::parseAccumulationStatement(expCtx.get(), groupField, vps)); + groupStage->_memoryTracker.set(pFieldName, 0); } } - groupStage->_memoryTracker.accumStatementMemoryBytes.resize( - groupStage->getAccumulatedFields().size(), {0, 0}); uassert( 15955, "a group specification must include an _id", !groupStage->_idExpressions.empty()); @@ -546,7 +538,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { GetNextResult input = pSource->getNext(); for (; input.isAdvanced(); input = pSource->getNext()) { - if (shouldSpillWithAttemptToSaveMemory([this]() { return freeMemory(); })) { + if (shouldSpillWithAttemptToSaveMemory()) { _sortedFiles.push_back(spill()); } @@ -564,7 +556,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { vector<uint64_t> oldAccumMemUsage(numAccumulators, 0); if (inserted) { - _memoryTracker.memoryUsageBytes += id.getApproximateSize(); + _memoryTracker.set(_memoryTracker.currentMemoryBytes() + id.getApproximateSize()); // Initialize and add the accumulators Value expandedId = expandId(id); @@ -581,8 +573,8 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { } else { for (size_t i = 0; i < group.size(); i++) { // subtract old mem usage. New usage added back after processing. - _memoryTracker.memoryUsageBytes -= group[i]->getMemUsage(); - oldAccumMemUsage[i] = group[i]->getMemUsage(); + _memoryTracker.update(_accumulatedFields[i].fieldName, + -1 * group[i]->getMemUsage()); } } @@ -593,18 +585,16 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { group[i]->process( _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables), _doingMerge); - - _memoryTracker.memoryUsageBytes += group[i]->getMemUsage(); - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes += - group[i]->getMemUsage() - oldAccumMemUsage[i]; + _memoryTracker.update(_accumulatedFields[i].fieldName, group[i]->getMemUsage()); } if (kDebugBuild && !storageGlobalParams.readOnly) { // In debug mode, spill every time we have a duplicate id to stress merge logic. - if (!inserted && // is a dup - !pExpCtx->inMongos && // can't spill to disk in mongos - !_memoryTracker.allowDiskUse && // don't change behavior when testing external sort - _sortedFiles.size() < 20) { // don't open too many FDs + if (!inserted && // is a dup + !pExpCtx->inMongos && // can't spill to disk in mongos + !_memoryTracker + ._allowDiskUse && // don't change behavior when testing external sort + _sortedFiles.size() < 20) { // don't open too many FDs _sortedFiles.push_back(spill()); } @@ -697,14 +687,10 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { metricsCollector.incrementSorterSpills(1); _groups->clear(); - // Update the max memory consumption per accumulation statement if the previous max was exceeded - // prior to spilling. Then zero out the current per-accumulation statement memory consumption, - // as the memory has been freed by spilling. - for (size_t i = 0; i < _memoryTracker.accumStatementMemoryBytes.size(); i++) { - _memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes = - std::max(_memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes, - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes); - _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes = 0; + // Zero out the current per-accumulation statement memory consumption, as the memory has been + // freed by spilling. + for (auto accum : _accumulatedFields) { + _memoryTracker.set(accum.fieldName, 0); } Sorter<Value, Value>::Iterator* iteratorPtr = writer.done(); @@ -788,9 +774,8 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distr copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse( pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps); mergingGroup->addAccumulator(copiedAccumulatedField); + mergingGroup->_memoryTracker.set(copiedAccumulatedField.fieldName, 0); } - mergingGroup->_memoryTracker.accumStatementMemoryBytes.resize(_accumulatedFields.size(), - {0, 0}); // {shardsStage, mergingStage, sortPattern} return DistributedPlanLogic{this, mergingGroup, boost::none}; @@ -889,7 +874,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { } size_t DocumentSourceGroup::getMaxMemoryUsageBytes() const { - return _memoryTracker.maxMemoryUsageBytes; + return _memoryTracker._maxAllowedMemoryUsageBytes; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 7497b597c75..f42945fa1a1 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -35,6 +35,7 @@ #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/memory_usage_tracker.h" #include "mongo/db/pipeline/transformer_interface.h" #include "mongo/db/sorter/sorter.h" @@ -193,24 +194,6 @@ protected: void doDispose() final; private: - struct MemoryUsageTracker { - struct AccumStatementMemoryTracker { - // Maximum memory consumption thus far observed. Only updated when data is spilled to - // disk during execution of the $group. - uint64_t maxMemoryBytes; - // Tracks the current memory footprint. - uint64_t currentMemoryBytes; - }; - - const bool allowDiskUse; - const size_t maxMemoryUsageBytes; - - // Tracks current memory used. This variable will be reset if data is spilled to disk. - size_t memoryUsageBytes = 0; - // Tracks memory consumption per accumulation statement. - std::vector<AccumStatementMemoryTracker> accumStatementMemoryBytes; - }; - explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<size_t> maxMemoryUsageBytes = boost::none); @@ -247,7 +230,7 @@ private: * If we ran out of memory, finish all the pending operations so that some memory * can be freed. */ - int freeMemory(); + void freeMemory(); Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); @@ -269,12 +252,11 @@ private: /** * Cleans up any pending memory usage. Throws error, if memory usage is above - * 'maxMemoryUsageBytes' and cannot spill to disk. The 'saveMemory' function should return - * the amount of memory saved by the cleanup. + * 'maxMemoryUsageBytes' and cannot spill to disk. * * Returns true, if the caller should spill to disk, false otherwise. */ - bool shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory); + bool shouldSpillWithAttemptToSaveMemory(); std::vector<AccumulationStatement> _accumulatedFields; diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp index 49700ee71a6..9a3e3d3498f 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp @@ -207,7 +207,11 @@ list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::create( // $_internalSetWindowFields result.push_back(make_intrusive<DocumentSourceInternalSetWindowFields>( - expCtx, simplePartitionByExpr, sortBy, outputFields)); + expCtx, + simplePartitionByExpr, + sortBy, + outputFields, + internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load())); // $unset if (complexPartitionBy) { @@ -246,7 +250,21 @@ Value DocumentSourceInternalSetWindowFields::serialize( } spec[SetWindowFieldsSpec::kOutputFieldName] = output.freezeToValue(); - return Value(DOC(kStageName << spec.freeze())); + MutableDocument out; + out[getSourceName()] = Value(spec.freeze()); + + if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) { + MutableDocument md; + + for (auto&& [fieldName, function] : _executableOutputs) { + md[fieldName] = + Value(static_cast<long long>(_memoryTracker[fieldName].maxMemoryBytes())); + } + + out["maxFunctionMemoryUsageBytes"] = Value(md.freezeToValue()); + } + + return Value(out.freezeToValue()); } boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::createFromBson( @@ -278,14 +296,18 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::crea } return make_intrusive<DocumentSourceInternalSetWindowFields>( - expCtx, partitionBy, sortBy, outputFields); + expCtx, + partitionBy, + sortBy, + outputFields, + internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load()); } void DocumentSourceInternalSetWindowFields::initialize() { - _maxMemory = internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load(); for (auto& wfs : _outputFields) { _executableOutputs[wfs.fieldName] = WindowFunctionExec::create(pExpCtx.get(), &_iterator, wfs, _sortBy); + _memoryTracker.set(wfs.fieldName, _executableOutputs[wfs.fieldName]->getApproximateSize()); } _init = true; } @@ -307,13 +329,19 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() // Populate the output document with the result from each window function. MutableDocument addFieldsSpec; - size_t functionMemUsage = 0; for (auto&& [fieldName, function] : _executableOutputs) { + auto oldIteratorMemUsage = _iterator.getApproximateSize(); addFieldsSpec.addField(fieldName, function->getNext()); - functionMemUsage += function->getApproximateSize(); + + // Update the memory usage for this function after getNext(). + _memoryTracker.set(fieldName, function->getApproximateSize()); + // Account for the additional memory in the iterator cache. + _memoryTracker.set(_memoryTracker.currentMemoryBytes() + + (_iterator.getApproximateSize() - oldIteratorMemUsage)); + uassert(5414201, "Exceeded memory limit in DocumentSourceSetWindowFields", - functionMemUsage + _iterator.getApproximateSize() < _maxMemory); + _memoryTracker.currentMemoryBytes() < _memoryTracker._maxAllowedMemoryUsageBytes); } // Advance the iterator and handle partition/EOF edge cases. @@ -321,8 +349,10 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() case PartitionIterator::AdvanceResult::kAdvanced: break; case PartitionIterator::AdvanceResult::kNewPartition: - // We've advanced to a new partition, reset the state of every function. - for (auto&& [_, function] : _executableOutputs) { + // We've advanced to a new partition, reset the state of every function as well as the + // memory tracker. + _memoryTracker.reset(); + for (auto&& [fieldName, function] : _executableOutputs) { function->reset(); } break; diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h index 074857b63e0..ca21d04cafa 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.h +++ b/src/mongo/db/pipeline/document_source_set_window_fields.h @@ -33,6 +33,7 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_set_window_fields_gen.h" +#include "mongo/db/pipeline/memory_usage_tracker.h" #include "mongo/db/pipeline/window_function/partition_iterator.h" #include "mongo/db/pipeline/window_function/window_bounds.h" #include "mongo/db/pipeline/window_function/window_function_exec.h" @@ -85,17 +86,18 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - DocumentSourceInternalSetWindowFields( const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<boost::intrusive_ptr<Expression>> partitionBy, const boost::optional<SortPattern>& sortBy, - std::vector<WindowFunctionStatement> outputFields) + std::vector<WindowFunctionStatement> outputFields, + size_t maxMemoryBytes) : DocumentSource(kStageName, expCtx), _partitionBy(partitionBy), _sortBy(std::move(sortBy)), _outputFields(std::move(outputFields)), - _iterator(expCtx.get(), pSource, std::move(partitionBy), _sortBy) {} + _iterator(expCtx.get(), pSource, std::move(partitionBy), _sortBy), + _memoryTracker{false /* allowDiskUse */, maxMemoryBytes} {}; StageConstraints constraints(Pipeline::SplitState pipeState) const final { return StageConstraints(StreamType::kBlocking, @@ -129,7 +131,6 @@ public: } private: - DocumentSource::GetNextResult getNextInput(); void initialize(); boost::optional<boost::intrusive_ptr<Expression>> _partitionBy; @@ -137,9 +138,9 @@ private: std::vector<WindowFunctionStatement> _outputFields; PartitionIterator _iterator; StringMap<std::unique_ptr<WindowFunctionExec>> _executableOutputs; + MemoryUsageTracker _memoryTracker; bool _init = false; bool _eof = false; - size_t _maxMemory = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/memory_usage_tracker.h b/src/mongo/db/pipeline/memory_usage_tracker.h new file mode 100644 index 00000000000..76de09383ac --- /dev/null +++ b/src/mongo/db/pipeline/memory_usage_tracker.h @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <utility> + +#include "mongo/util/string_map.h" + +namespace mongo { + +/** + * This is a utility class for tracking memory usage across multiple arbitrary operators or + * functions, which are identified by their string names. + */ +class MemoryUsageTracker { +public: + class PerFunctionMemoryTracker { + public: + PerFunctionMemoryTracker() = default; + + void update(int diff) { + _currentMemoryBytes += diff; + if (_currentMemoryBytes > _maxMemoryBytes) + _maxMemoryBytes = _currentMemoryBytes; + } + + void set(uint64_t total) { + if (total > _maxMemoryBytes) + _maxMemoryBytes = total; + _currentMemoryBytes = total; + } + + auto currentMemoryBytes() const { + return _currentMemoryBytes; + } + + auto maxMemoryBytes() const { + return _maxMemoryBytes; + } + + private: + // Maximum memory consumption thus far observed for this function. + uint64_t _maxMemoryBytes = 0; + // Tracks the current memory footprint. + uint64_t _currentMemoryBytes = 0; + }; + + MemoryUsageTracker(bool allowDiskUse, size_t maxMemoryUsageBytes) + : _allowDiskUse(allowDiskUse), _maxAllowedMemoryUsageBytes(maxMemoryUsageBytes) {} + + /** + * Sets the new total for 'functionName', and updates the current total memory usage. + */ + void set(StringData functionName, uint64_t total) { + auto oldFuncUsage = _functionMemoryTracker[functionName].currentMemoryBytes(); + _functionMemoryTracker[functionName].set(total); + _memoryUsageBytes += total - oldFuncUsage; + } + + /** + * Sets the new current memory usage in bytes. + */ + void set(uint64_t total) { + _memoryUsageBytes = total; + } + + /** + * Resets both the total memory usage as well as the per-function memory usage. + */ + void reset() { + _memoryUsageBytes = 0; + for (auto& [_, funcTracker] : _functionMemoryTracker) { + funcTracker.set(0); + } + } + + /** + * Provides read-only access to the function memory tracker for 'name'. + */ + auto operator[](StringData name) const { + tassert(5466400, + str::stream() << "Invalid call to memory usage tracker, could not find function " + << name, + _functionMemoryTracker.find(name) != _functionMemoryTracker.end()); + return _functionMemoryTracker.at(name); + } + + /** + * Updates the memory usage for 'functionName' by adding 'diff' to the current memory usage for + * that function. Also updates the total memory usage. + */ + void update(StringData name, int diff) { + _functionMemoryTracker[name].update(diff); + _memoryUsageBytes += diff; + } + + auto currentMemoryBytes() const { + return _memoryUsageBytes; + } + + const bool _allowDiskUse; + const size_t _maxAllowedMemoryUsageBytes; + +private: + // Tracks current memory used. + size_t _memoryUsageBytes = 0; + + // Tracks memory consumption per function using the output field name as a key. + StringMap<PerFunctionMemoryTracker> _functionMemoryTracker; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp index 99ddf7fb4e2..38a61a000bd 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp @@ -63,6 +63,7 @@ WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument( [&](const int& upperIndex) { _upperBound = upperIndex; }, }, bounds.upper); + _memUsageBytes = sizeof(*this); } void WindowFunctionExecRemovableDocument::initialize() { diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h index bc0c81e094a..b8564eda950 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h @@ -69,12 +69,14 @@ public: WindowBounds::DocumentBased bounds) : WindowFunctionExecRemovableDocument(iter, std::move(input), std::move(function), bounds) { _sortBy = std::move(sortBy); + _memUsageBytes = sizeof(*this); } void reset() final { _function->reset(); _values = std::queue<Value>(); _initialized = false; + _memUsageBytes = sizeof(*this); } private: |