summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2021-03-31 14:35:10 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-30 14:22:03 +0000
commitf36132ec219dd8cfdab420403f10d18666249947 (patch)
treeb670698b942e7dbf84e01ae9e1afd061680ec457
parent3295126e8a081ca57af1a07c6c582fac7a825efd (diff)
downloadmongo-f36132ec219dd8cfdab420403f10d18666249947.tar.gz
SERVER-54664 Add per-function execution stats to $setWindowFields stage
-rw-r--r--jstests/aggregation/sources/setWindowFields/explain.js149
-rw-r--r--jstests/aggregation/sources/setWindowFields/memory_limit.js1
-rw-r--r--jstests/noPassthrough/explain_group_stage_exec_stats.js9
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp83
-rw-r--r--src/mongo/db/pipeline/document_source_group.h26
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.h11
-rw-r--r--src/mongo/db/pipeline/memory_usage_tracker.h140
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp1
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h2
10 files changed, 382 insertions, 88 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/explain.js b/jstests/aggregation/sources/setWindowFields/explain.js
new file mode 100644
index 00000000000..fbae93889d8
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/explain.js
@@ -0,0 +1,149 @@
+/**
+ * Tests that $setWindowFields stage reports memory footprint per function when explain is run
+ * with verbosities "executionStats" and "allPlansExecution".
+ *
+ * @tags: [assumes_against_mongod_not_mongos]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/analyze_plan.js"); // For getAggPlanStages().
+
+const coll = db[jsTestName()];
+coll.drop();
+const bigStr = Array(1025).toString(); // 1KB of ','
+const nDocs = 1000;
+const nPartitions = 50;
+const docSize = 8 + 8 + 1024;
+
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1}))
+ .featureFlagWindowFunctions.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the window function feature flag is disabled");
+ return;
+}
+
+let bulk = coll.initializeUnorderedBulkOp();
+for (let i = 1; i <= nDocs; i++) {
+ bulk.insert({_id: i, key: i % nPartitions, bigStr: bigStr});
+}
+assert.commandWorked(bulk.execute());
+
+/**
+ * Checks that the execution stats in the explain output for a $setWindowFields stage are as
+ * expected.
+ * - 'stages' is an array of the explain output of $setWindowFields stages.
+ * - 'expectedFunctionMemUsages' is used to check the memory footprint stats for each function.
+ * - 'verbosity' indicates the explain verbosity used.
+ */
+function checkExplainResult(pipeline, expectedFunctionMemUsages, verbosity) {
+ const stages =
+ getAggPlanStages(coll.explain(verbosity).aggregate(pipeline), "$_internalSetWindowFields");
+ for (let stage of stages) {
+ assert(stage.hasOwnProperty("$_internalSetWindowFields"), stage);
+
+ if (verbosity === "executionStats" || verbosity === "allPlansExecution") {
+ assert(stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage);
+ const maxFunctionMemUsages = stage["maxFunctionMemoryUsageBytes"];
+ for (let field of Object.keys(maxFunctionMemUsages)) {
+ // Ensures that the expected functions are all included and the corresponding
+ // memory usage is in a reasonable range.
+ if (expectedFunctionMemUsages.hasOwnProperty(field)) {
+ assert.gt(maxFunctionMemUsages[field],
+ expectedFunctionMemUsages[field],
+ "mismatch for function '" + field + "': " + tojson(stage));
+ assert.lt(maxFunctionMemUsages[field],
+ 2 * expectedFunctionMemUsages[field],
+ "mismatch for function '" + field + "': " + tojson(stage));
+ }
+ }
+ } else {
+ assert(!stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage);
+ }
+ }
+}
+
+(function testQueryPlannerVerbosity() {
+ const pipeline = [
+ {
+ $setWindowFields:
+ {output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}}
+ },
+ ];
+ const stages = getAggPlanStages(coll.explain("queryPlanner").aggregate(pipeline),
+ "$_internalSetWindowFields");
+ checkExplainResult(stages, {}, "queryPlanner");
+})();
+
+(function testUnboundedMemUsage() {
+ let pipeline = [
+ {
+ $setWindowFields:
+ {output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}}
+ },
+ ];
+
+ // The $setWindowFields stage "streams" one partition at a time, so there's only one instance of
+ // each function. For the default [unbounded, unbounded] window type, each function uses memory
+ // usage comparable to it's $group counterpart.
+ let expectedFunctionMemUsages = {
+ count: 60,
+ push: nDocs * 1024,
+ set: 1024,
+ };
+
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats");
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution");
+
+ // Test that the memory footprint is reduced with partitioning.
+ pipeline = [
+ {
+ $setWindowFields: {
+ partitionBy: "$key",
+ output: {count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}
+ }
+ },
+ ];
+ expectedFunctionMemUsages = {
+ count: 60,
+ push: (nDocs / nPartitions) * 1024,
+ set: 1024,
+ };
+
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats");
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution");
+})();
+
+(function testSlidingWindowMemUsage() {
+ const windowSize = 10;
+ let pipeline = [
+ {
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}}
+ }
+ },
+ ];
+ const expectedFunctionMemUsages = {
+ runningSum: windowSize * 16 +
+ 160, // 10x64-bit integer values per window, and 160 for the $sum state.
+ };
+
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats");
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution");
+
+ // Adding partitioning doesn't change the peak memory usage.
+ pipeline = [
+ {
+ $setWindowFields: {
+ partitionBy: "$key",
+ sortBy: {_id: 1},
+ output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}}
+ }
+ },
+ ];
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "executionStats");
+ checkExplainResult(pipeline, expectedFunctionMemUsages, "allPlansExecution");
+})();
+}());
diff --git a/jstests/aggregation/sources/setWindowFields/memory_limit.js b/jstests/aggregation/sources/setWindowFields/memory_limit.js
index 6e705beaa4b..f960780d06b 100644
--- a/jstests/aggregation/sources/setWindowFields/memory_limit.js
+++ b/jstests/aggregation/sources/setWindowFields/memory_limit.js
@@ -58,6 +58,7 @@ assert.commandWorked(coll.runCommand({
}],
cursor: {}
}));
+
// Test that the query fails with a window function that stores documents.
assert.commandFailedWithCode(coll.runCommand({
aggregate: coll.getName(),
diff --git a/jstests/noPassthrough/explain_group_stage_exec_stats.js b/jstests/noPassthrough/explain_group_stage_exec_stats.js
index 55be2c77dc2..257d9676baa 100644
--- a/jstests/noPassthrough/explain_group_stage_exec_stats.js
+++ b/jstests/noPassthrough/explain_group_stage_exec_stats.js
@@ -45,7 +45,7 @@ const expectedAccumMemUsages = {
function checkGroupStages(stages, expectedAccumMemUsages, isExecExplain, shouldSpillToDisk) {
// Tracks the memory usage per accumulator in total as 'stages' passed in could be the explain
// output across a cluster.
- let totalAccmMemoryUsageBytes = 0;
+ let totalAccumMemoryUsageBytes = 0;
for (let stage of stages) {
assert(stage.hasOwnProperty("$group"), stage);
@@ -54,7 +54,7 @@ function checkGroupStages(stages, expectedAccumMemUsages, isExecExplain, shouldS
assert(stage.hasOwnProperty("maxAccumulatorMemoryUsageBytes"), stage);
const maxAccmMemUsages = stage["maxAccumulatorMemoryUsageBytes"];
for (let field of Object.keys(maxAccmMemUsages)) {
- totalAccmMemoryUsageBytes += maxAccmMemUsages[field];
+ totalAccumMemoryUsageBytes += maxAccmMemUsages[field];
// Ensures that the expected accumulators are all included and the corresponding
// memory usage is in a reasonable range. Note that in debug mode, data will be
@@ -69,8 +69,11 @@ function checkGroupStages(stages, expectedAccumMemUsages, isExecExplain, shouldS
}
}
+ // Add some wiggle room to the total memory used compared to the limit parameter since the check
+ // for spilling to disk happens after each document is processed.
if (shouldSpillToDisk)
- assert.gt(maxMemoryLimitForGroupStage, totalAccmMemoryUsageBytes);
+ assert.gt(
+ maxMemoryLimitForGroupStage + 3 * 1024, totalAccumMemoryUsageBytes, tojson(stages));
}
let groupStages = getAggPlanStages(coll.explain("executionStats").aggregate(pipeline), "$group");
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 5326cf864af..c4ca7bc932f 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -133,39 +133,37 @@ const char* DocumentSourceGroup::getSourceName() const {
return kStageName.rawData();
}
-bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory) {
- if (!_memoryTracker.allowDiskUse &&
- (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes)) {
- _memoryTracker.memoryUsageBytes -= saveMemory();
+bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory() {
+ if (!_memoryTracker._allowDiskUse &&
+ (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes)) {
+ freeMemory();
}
- if (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes) {
+ if (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes) {
uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
"Exceeded memory limit for $group, but didn't allow external sort."
" Pass allowDiskUse:true to opt in.",
- _memoryTracker.allowDiskUse);
- _memoryTracker.memoryUsageBytes = 0;
+ _memoryTracker._allowDiskUse);
+ _memoryTracker.set(0);
return true;
}
return false;
}
-int DocumentSourceGroup::freeMemory() {
+void DocumentSourceGroup::freeMemory() {
invariant(_groups);
- int totalMemorySaved = 0;
for (auto&& group : *_groups) {
for (size_t i = 0; i < group.second.size(); i++) {
- auto prevMemUsage = group.second[i]->getMemUsage();
+ // Subtract the current usage.
+ _memoryTracker.update(_accumulatedFields[i].fieldName,
+ -1 * group.second[i]->getMemUsage());
+
group.second[i]->reduceMemoryConsumptionIfAble();
- auto memorySaved = prevMemUsage - group.second[i]->getMemUsage();
// Update the memory usage for this AccumulationStatement.
- _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes -= memorySaved;
- // Update the memory usage for this group.
- totalMemorySaved += memorySaved;
+ _memoryTracker.update(_accumulatedFields[i].fieldName, group.second[i]->getMemUsage());
}
}
- return totalMemorySaved;
}
DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() {
@@ -306,16 +304,12 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity>
MutableDocument out;
out[getSourceName()] = Value(insides.freeze());
- if (explain >= ExplainOptions::Verbosity::kExecStats) {
+ if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) {
MutableDocument md;
- invariant(_accumulatedFields.size() == _memoryTracker.accumStatementMemoryBytes.size());
for (size_t i = 0; i < _accumulatedFields.size(); i++) {
- md[_accumulatedFields[i].fieldName] = _stats.usedDisk
- ? Value(static_cast<long long>(
- _memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes))
- : Value(static_cast<long long>(
- _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes));
+ md[_accumulatedFields[i].fieldName] = Value(static_cast<long long>(
+ _memoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes()));
}
out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue());
@@ -390,9 +384,8 @@ intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create(
groupStage->setIdExpression(groupByExpression);
for (auto&& statement : accumulationStatements) {
groupStage->addAccumulator(statement);
+ groupStage->_memoryTracker.set(statement.fieldName, 0);
}
- groupStage->_memoryTracker.accumStatementMemoryBytes.resize(accumulationStatements.size(),
- {0, 0});
return groupStage;
}
@@ -498,10 +491,9 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson(
// Any other field will be treated as an accumulator specification.
groupStage->addAccumulator(
AccumulationStatement::parseAccumulationStatement(expCtx.get(), groupField, vps));
+ groupStage->_memoryTracker.set(pFieldName, 0);
}
}
- groupStage->_memoryTracker.accumStatementMemoryBytes.resize(
- groupStage->getAccumulatedFields().size(), {0, 0});
uassert(
15955, "a group specification must include an _id", !groupStage->_idExpressions.empty());
@@ -546,7 +538,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
GetNextResult input = pSource->getNext();
for (; input.isAdvanced(); input = pSource->getNext()) {
- if (shouldSpillWithAttemptToSaveMemory([this]() { return freeMemory(); })) {
+ if (shouldSpillWithAttemptToSaveMemory()) {
_sortedFiles.push_back(spill());
}
@@ -564,7 +556,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
vector<uint64_t> oldAccumMemUsage(numAccumulators, 0);
if (inserted) {
- _memoryTracker.memoryUsageBytes += id.getApproximateSize();
+ _memoryTracker.set(_memoryTracker.currentMemoryBytes() + id.getApproximateSize());
// Initialize and add the accumulators
Value expandedId = expandId(id);
@@ -581,8 +573,8 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
} else {
for (size_t i = 0; i < group.size(); i++) {
// subtract old mem usage. New usage added back after processing.
- _memoryTracker.memoryUsageBytes -= group[i]->getMemUsage();
- oldAccumMemUsage[i] = group[i]->getMemUsage();
+ _memoryTracker.update(_accumulatedFields[i].fieldName,
+ -1 * group[i]->getMemUsage());
}
}
@@ -593,18 +585,16 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
group[i]->process(
_accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables),
_doingMerge);
-
- _memoryTracker.memoryUsageBytes += group[i]->getMemUsage();
- _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes +=
- group[i]->getMemUsage() - oldAccumMemUsage[i];
+ _memoryTracker.update(_accumulatedFields[i].fieldName, group[i]->getMemUsage());
}
if (kDebugBuild && !storageGlobalParams.readOnly) {
// In debug mode, spill every time we have a duplicate id to stress merge logic.
- if (!inserted && // is a dup
- !pExpCtx->inMongos && // can't spill to disk in mongos
- !_memoryTracker.allowDiskUse && // don't change behavior when testing external sort
- _sortedFiles.size() < 20) { // don't open too many FDs
+ if (!inserted && // is a dup
+ !pExpCtx->inMongos && // can't spill to disk in mongos
+ !_memoryTracker
+ ._allowDiskUse && // don't change behavior when testing external sort
+ _sortedFiles.size() < 20) { // don't open too many FDs
_sortedFiles.push_back(spill());
}
@@ -697,14 +687,10 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
metricsCollector.incrementSorterSpills(1);
_groups->clear();
- // Update the max memory consumption per accumulation statement if the previous max was exceeded
- // prior to spilling. Then zero out the current per-accumulation statement memory consumption,
- // as the memory has been freed by spilling.
- for (size_t i = 0; i < _memoryTracker.accumStatementMemoryBytes.size(); i++) {
- _memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes =
- std::max(_memoryTracker.accumStatementMemoryBytes[i].maxMemoryBytes,
- _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes);
- _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes = 0;
+ // Zero out the current per-accumulation statement memory consumption, as the memory has been
+ // freed by spilling.
+ for (auto accum : _accumulatedFields) {
+ _memoryTracker.set(accum.fieldName, 0);
}
Sorter<Value, Value>::Iterator* iteratorPtr = writer.done();
@@ -788,9 +774,8 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distr
copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse(
pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps);
mergingGroup->addAccumulator(copiedAccumulatedField);
+ mergingGroup->_memoryTracker.set(copiedAccumulatedField.fieldName, 0);
}
- mergingGroup->_memoryTracker.accumStatementMemoryBytes.resize(_accumulatedFields.size(),
- {0, 0});
// {shardsStage, mergingStage, sortPattern}
return DistributedPlanLogic{this, mergingGroup, boost::none};
@@ -889,7 +874,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
}
size_t DocumentSourceGroup::getMaxMemoryUsageBytes() const {
- return _memoryTracker.maxMemoryUsageBytes;
+ return _memoryTracker._maxAllowedMemoryUsageBytes;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 7497b597c75..f42945fa1a1 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -35,6 +35,7 @@
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/memory_usage_tracker.h"
#include "mongo/db/pipeline/transformer_interface.h"
#include "mongo/db/sorter/sorter.h"
@@ -193,24 +194,6 @@ protected:
void doDispose() final;
private:
- struct MemoryUsageTracker {
- struct AccumStatementMemoryTracker {
- // Maximum memory consumption thus far observed. Only updated when data is spilled to
- // disk during execution of the $group.
- uint64_t maxMemoryBytes;
- // Tracks the current memory footprint.
- uint64_t currentMemoryBytes;
- };
-
- const bool allowDiskUse;
- const size_t maxMemoryUsageBytes;
-
- // Tracks current memory used. This variable will be reset if data is spilled to disk.
- size_t memoryUsageBytes = 0;
- // Tracks memory consumption per accumulation statement.
- std::vector<AccumStatementMemoryTracker> accumStatementMemoryBytes;
- };
-
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<size_t> maxMemoryUsageBytes = boost::none);
@@ -247,7 +230,7 @@ private:
* If we ran out of memory, finish all the pending operations so that some memory
* can be freed.
*/
- int freeMemory();
+ void freeMemory();
Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
@@ -269,12 +252,11 @@ private:
/**
* Cleans up any pending memory usage. Throws error, if memory usage is above
- * 'maxMemoryUsageBytes' and cannot spill to disk. The 'saveMemory' function should return
- * the amount of memory saved by the cleanup.
+ * 'maxMemoryUsageBytes' and cannot spill to disk.
*
* Returns true, if the caller should spill to disk, false otherwise.
*/
- bool shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory);
+ bool shouldSpillWithAttemptToSaveMemory();
std::vector<AccumulationStatement> _accumulatedFields;
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
index 49700ee71a6..9a3e3d3498f 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
@@ -207,7 +207,11 @@ list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::create(
// $_internalSetWindowFields
result.push_back(make_intrusive<DocumentSourceInternalSetWindowFields>(
- expCtx, simplePartitionByExpr, sortBy, outputFields));
+ expCtx,
+ simplePartitionByExpr,
+ sortBy,
+ outputFields,
+ internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load()));
// $unset
if (complexPartitionBy) {
@@ -246,7 +250,21 @@ Value DocumentSourceInternalSetWindowFields::serialize(
}
spec[SetWindowFieldsSpec::kOutputFieldName] = output.freezeToValue();
- return Value(DOC(kStageName << spec.freeze()));
+ MutableDocument out;
+ out[getSourceName()] = Value(spec.freeze());
+
+ if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) {
+ MutableDocument md;
+
+ for (auto&& [fieldName, function] : _executableOutputs) {
+ md[fieldName] =
+ Value(static_cast<long long>(_memoryTracker[fieldName].maxMemoryBytes()));
+ }
+
+ out["maxFunctionMemoryUsageBytes"] = Value(md.freezeToValue());
+ }
+
+ return Value(out.freezeToValue());
}
boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::createFromBson(
@@ -278,14 +296,18 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::crea
}
return make_intrusive<DocumentSourceInternalSetWindowFields>(
- expCtx, partitionBy, sortBy, outputFields);
+ expCtx,
+ partitionBy,
+ sortBy,
+ outputFields,
+ internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load());
}
void DocumentSourceInternalSetWindowFields::initialize() {
- _maxMemory = internalDocumentSourceSetWindowFieldsMaxMemoryBytes.load();
for (auto& wfs : _outputFields) {
_executableOutputs[wfs.fieldName] =
WindowFunctionExec::create(pExpCtx.get(), &_iterator, wfs, _sortBy);
+ _memoryTracker.set(wfs.fieldName, _executableOutputs[wfs.fieldName]->getApproximateSize());
}
_init = true;
}
@@ -307,13 +329,19 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext()
// Populate the output document with the result from each window function.
MutableDocument addFieldsSpec;
- size_t functionMemUsage = 0;
for (auto&& [fieldName, function] : _executableOutputs) {
+ auto oldIteratorMemUsage = _iterator.getApproximateSize();
addFieldsSpec.addField(fieldName, function->getNext());
- functionMemUsage += function->getApproximateSize();
+
+ // Update the memory usage for this function after getNext().
+ _memoryTracker.set(fieldName, function->getApproximateSize());
+ // Account for the additional memory in the iterator cache.
+ _memoryTracker.set(_memoryTracker.currentMemoryBytes() +
+ (_iterator.getApproximateSize() - oldIteratorMemUsage));
+
uassert(5414201,
"Exceeded memory limit in DocumentSourceSetWindowFields",
- functionMemUsage + _iterator.getApproximateSize() < _maxMemory);
+ _memoryTracker.currentMemoryBytes() < _memoryTracker._maxAllowedMemoryUsageBytes);
}
// Advance the iterator and handle partition/EOF edge cases.
@@ -321,8 +349,10 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext()
case PartitionIterator::AdvanceResult::kAdvanced:
break;
case PartitionIterator::AdvanceResult::kNewPartition:
- // We've advanced to a new partition, reset the state of every function.
- for (auto&& [_, function] : _executableOutputs) {
+ // We've advanced to a new partition, reset the state of every function as well as the
+ // memory tracker.
+ _memoryTracker.reset();
+ for (auto&& [fieldName, function] : _executableOutputs) {
function->reset();
}
break;
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h
index 074857b63e0..ca21d04cafa 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.h
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.h
@@ -33,6 +33,7 @@
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_set_window_fields_gen.h"
+#include "mongo/db/pipeline/memory_usage_tracker.h"
#include "mongo/db/pipeline/window_function/partition_iterator.h"
#include "mongo/db/pipeline/window_function/window_bounds.h"
#include "mongo/db/pipeline/window_function/window_function_exec.h"
@@ -85,17 +86,18 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
DocumentSourceInternalSetWindowFields(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<boost::intrusive_ptr<Expression>> partitionBy,
const boost::optional<SortPattern>& sortBy,
- std::vector<WindowFunctionStatement> outputFields)
+ std::vector<WindowFunctionStatement> outputFields,
+ size_t maxMemoryBytes)
: DocumentSource(kStageName, expCtx),
_partitionBy(partitionBy),
_sortBy(std::move(sortBy)),
_outputFields(std::move(outputFields)),
- _iterator(expCtx.get(), pSource, std::move(partitionBy), _sortBy) {}
+ _iterator(expCtx.get(), pSource, std::move(partitionBy), _sortBy),
+ _memoryTracker{false /* allowDiskUse */, maxMemoryBytes} {};
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
return StageConstraints(StreamType::kBlocking,
@@ -129,7 +131,6 @@ public:
}
private:
- DocumentSource::GetNextResult getNextInput();
void initialize();
boost::optional<boost::intrusive_ptr<Expression>> _partitionBy;
@@ -137,9 +138,9 @@ private:
std::vector<WindowFunctionStatement> _outputFields;
PartitionIterator _iterator;
StringMap<std::unique_ptr<WindowFunctionExec>> _executableOutputs;
+ MemoryUsageTracker _memoryTracker;
bool _init = false;
bool _eof = false;
- size_t _maxMemory = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/memory_usage_tracker.h b/src/mongo/db/pipeline/memory_usage_tracker.h
new file mode 100644
index 00000000000..76de09383ac
--- /dev/null
+++ b/src/mongo/db/pipeline/memory_usage_tracker.h
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "mongo/util/string_map.h"
+
+namespace mongo {
+
+/**
+ * This is a utility class for tracking memory usage across multiple arbitrary operators or
+ * functions, which are identified by their string names.
+ */
+class MemoryUsageTracker {
+public:
+ class PerFunctionMemoryTracker {
+ public:
+ PerFunctionMemoryTracker() = default;
+
+ void update(int diff) {
+ _currentMemoryBytes += diff;
+ if (_currentMemoryBytes > _maxMemoryBytes)
+ _maxMemoryBytes = _currentMemoryBytes;
+ }
+
+ void set(uint64_t total) {
+ if (total > _maxMemoryBytes)
+ _maxMemoryBytes = total;
+ _currentMemoryBytes = total;
+ }
+
+ auto currentMemoryBytes() const {
+ return _currentMemoryBytes;
+ }
+
+ auto maxMemoryBytes() const {
+ return _maxMemoryBytes;
+ }
+
+ private:
+ // Maximum memory consumption thus far observed for this function.
+ uint64_t _maxMemoryBytes = 0;
+ // Tracks the current memory footprint.
+ uint64_t _currentMemoryBytes = 0;
+ };
+
+ MemoryUsageTracker(bool allowDiskUse, size_t maxMemoryUsageBytes)
+ : _allowDiskUse(allowDiskUse), _maxAllowedMemoryUsageBytes(maxMemoryUsageBytes) {}
+
+ /**
+ * Sets the new total for 'functionName', and updates the current total memory usage.
+ */
+ void set(StringData functionName, uint64_t total) {
+ auto oldFuncUsage = _functionMemoryTracker[functionName].currentMemoryBytes();
+ _functionMemoryTracker[functionName].set(total);
+ _memoryUsageBytes += total - oldFuncUsage;
+ }
+
+ /**
+ * Sets the new current memory usage in bytes.
+ */
+ void set(uint64_t total) {
+ _memoryUsageBytes = total;
+ }
+
+ /**
+ * Resets both the total memory usage as well as the per-function memory usage.
+ */
+ void reset() {
+ _memoryUsageBytes = 0;
+ for (auto& [_, funcTracker] : _functionMemoryTracker) {
+ funcTracker.set(0);
+ }
+ }
+
+ /**
+ * Provides read-only access to the function memory tracker for 'name'.
+ */
+ auto operator[](StringData name) const {
+ tassert(5466400,
+ str::stream() << "Invalid call to memory usage tracker, could not find function "
+ << name,
+ _functionMemoryTracker.find(name) != _functionMemoryTracker.end());
+ return _functionMemoryTracker.at(name);
+ }
+
+ /**
+ * Updates the memory usage for 'functionName' by adding 'diff' to the current memory usage for
+ * that function. Also updates the total memory usage.
+ */
+ void update(StringData name, int diff) {
+ _functionMemoryTracker[name].update(diff);
+ _memoryUsageBytes += diff;
+ }
+
+ auto currentMemoryBytes() const {
+ return _memoryUsageBytes;
+ }
+
+ const bool _allowDiskUse;
+ const size_t _maxAllowedMemoryUsageBytes;
+
+private:
+ // Tracks current memory used.
+ size_t _memoryUsageBytes = 0;
+
+ // Tracks memory consumption per function using the output field name as a key.
+ StringMap<PerFunctionMemoryTracker> _functionMemoryTracker;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
index 99ddf7fb4e2..38a61a000bd 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
@@ -63,6 +63,7 @@ WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument(
[&](const int& upperIndex) { _upperBound = upperIndex; },
},
bounds.upper);
+ _memUsageBytes = sizeof(*this);
}
void WindowFunctionExecRemovableDocument::initialize() {
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
index bc0c81e094a..b8564eda950 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
@@ -69,12 +69,14 @@ public:
WindowBounds::DocumentBased bounds)
: WindowFunctionExecRemovableDocument(iter, std::move(input), std::move(function), bounds) {
_sortBy = std::move(sortBy);
+ _memUsageBytes = sizeof(*this);
}
void reset() final {
_function->reset();
_values = std::queue<Value>();
_initialized = false;
+ _memUsageBytes = sizeof(*this);
}
private: