summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2021-04-28 17:24:33 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-24 14:03:24 +0000
commit1a9a9b424d19c914118477bbd245bf3fbf96253d (patch)
tree4e435e1d17b0c3a307281c4297e77b64ac7307c2
parent50547a878aaa67722f69c05b05a35236f8f0def9 (diff)
downloadmongo-1a9a9b424d19c914118477bbd245bf3fbf96253d.tar.gz
SERVER-55786 Update PartitionIterator memory usage when documents are released
This commit also handles the inflation of Documents as expressions are evaluated due to the internal caching in the Document class. (cherry picked from commit be41c9d532346c873c5b909a18fe92e8885337cb)
-rw-r--r--jstests/aggregation/extras/window_function_helpers.js53
-rw-r--r--jstests/aggregation/sources/setWindowFields/explain.js39
-rw-r--r--jstests/aggregation/sources/setWindowFields/memory_limit.js26
-rw-r--r--jstests/aggregation/sources/setWindowFields/spill_to_disk.js56
-rw-r--r--src/mongo/db/exec/document_value/document.cpp8
-rw-r--r--src/mongo/db/exec/document_value/document.h14
-rw-r--r--src/mongo/db/exec/document_value/document_internal.h5
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.h6
-rw-r--r--src/mongo/db/pipeline/memory_usage_tracker.h91
-rw-r--r--src/mongo/db/pipeline/memory_usage_tracker_test.cpp123
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.cpp17
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.h14
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator_test.cpp196
-rw-r--r--src/mongo/db/pipeline/window_function/spillable_cache.cpp24
-rw-r--r--src/mongo/db/pipeline/window_function/spillable_cache.h23
-rw-r--r--src/mongo/db/pipeline/window_function/spillable_cache_test.cpp5
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec.cpp38
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec.h45
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_derivative.cpp9
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_derivative.h11
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp11
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_first_last.h24
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp13
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h22
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h12
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp45
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp9
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h11
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp10
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h12
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp76
34 files changed, 677 insertions, 405 deletions
diff --git a/jstests/aggregation/extras/window_function_helpers.js b/jstests/aggregation/extras/window_function_helpers.js
index b62c730f8e9..ed44e800691 100644
--- a/jstests/aggregation/extras/window_function_helpers.js
+++ b/jstests/aggregation/extras/window_function_helpers.js
@@ -1,4 +1,6 @@
load("jstests/aggregation/extras/utils.js"); // arrayEq
+load("jstests/libs/analyze_plan.js"); // For getAggPlanStages().
+
/**
* Create a collection of tickers and prices.
*/
@@ -170,6 +172,28 @@ function assertResultsEqual(wfRes, index, groupRes, accum) {
"Window function result for index " + index + ": " + tojson(wfRes));
}
+function assertExplainResult(explainResult) {
+ const stages = getAggPlanStages(explainResult, "$_internalSetWindowFields");
+ for (let stage of stages) {
+ assert(stage.hasOwnProperty("$_internalSetWindowFields"), stage);
+
+ assert(stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage);
+ const maxFunctionMemUsages = stage["maxFunctionMemoryUsageBytes"];
+ for (let field of Object.keys(maxFunctionMemUsages)) {
+ assert.gte(maxFunctionMemUsages[field],
+ 0,
+ "invalid memory usage for '" + field + "': " + tojson(stage));
+ }
+ assert.gt(
+ stage["maxTotalMemoryUsageBytes"], 0, "Incorrect total mem usage: " + tojson(stage));
+ // No test should be using more than 1GB of memory. This is mostly a sanity check that
+ // integer underflow doesn't occur.
+ assert.lt(stage["maxTotalMemoryUsageBytes"],
+ 1 * 1024 * 1024 * 1024,
+ "Incorrect total mem usage: " + tojson(stage));
+ }
+}
+
/**
* Runs the given 'accum' as both a window function and its equivalent accumulator in $group across
* various combinations of partitioning and window bounds. Asserts that the results are consistent.
@@ -182,19 +206,17 @@ function testAccumAgainstGroup(coll, accum, onNoResults = null) {
documentBounds.forEach(function(bounds, index) {
jsTestLog("Testing accumulator " + accum + " against " + tojson(partition) +
" partition and [" + bounds + "] bounds");
- const wfResults =
- coll.aggregate(
- [
- {
- $setWindowFields: {
- partitionBy: partition,
- sortBy: {_id: 1},
- output: {res: {[accum]: "$price", window: {documents: bounds}}}
- },
- },
- ],
- {allowDiskUse: true})
- .toArray();
+
+ const pipeline = [
+ {
+ $setWindowFields: {
+ partitionBy: partition,
+ sortBy: {_id: 1},
+ output: {res: {[accum]: "$price", window: {documents: bounds}}}
+ },
+ },
+ ];
+ const wfResults = coll.aggregate(pipeline, {allowDiskUse: true}).toArray();
for (let index = 0; index < wfResults.length; index++) {
const wfRes = wfResults[index];
@@ -223,6 +245,11 @@ function testAccumAgainstGroup(coll, accum, onNoResults = null) {
assertResultsEqual(wfRes, index, groupRes, accum);
}
+ // Run the same pipeline with explain verbosity "executionStats" and verify that the
+ // reported metrics are sensible.
+ assertExplainResult(
+ coll.explain("executionStats").aggregate(pipeline, {allowDiskUse: true}));
+
jsTestLog("Done");
});
diff --git a/jstests/aggregation/sources/setWindowFields/explain.js b/jstests/aggregation/sources/setWindowFields/explain.js
index ee95c9b358a..b7542771944 100644
--- a/jstests/aggregation/sources/setWindowFields/explain.js
+++ b/jstests/aggregation/sources/setWindowFields/explain.js
@@ -132,9 +132,7 @@ function checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotalMe
set: 1144, // 1024 for the string, rest is constant state.
};
// Add one document for the NextPartitionState structure.
- // TODO SERVER-55786: Fix memory tracking so that the PartitionIterator max memory is correctly
- // recorded, remove the '-1' in this line.
- expectedTotal = (nDocs / nPartitions - 1) * docSize;
+ expectedTotal = ((nDocs / nPartitions) + 1) * docSize;
for (let func in expectedFunctionMemUsages) {
expectedTotal += expectedFunctionMemUsages[func];
}
@@ -154,22 +152,39 @@ function checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotalMe
{
$setWindowFields: {
sortBy: {_id: 1},
- output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}}
+ output: {runningSum: {$sum: "$_id", window: {documents: [0, 9]}}}
}
},
];
const expectedFunctionMemUsages = {
- // 10x64-bit integer values per window, and 72 and 144 for the $sum accumulator and executor
- // state.
- runningSum: windowSize * 16 + 136 + 144,
+ // 10 integer values per window, with some extra to account for the $sum accumulator and
+ // executor state.
+ runningSum: windowSize * 16 + 120,
};
- // TODO SERVER-55786: Fix memory tracking so that the PartitionIterator max memory is correctly
- // recorded, remove the '-1' in this line.
- let expectedTotal = (numDocsHeld - 1) * docSize;
+ let expectedTotal = windowSize * docSize;
+ for (let func in expectedFunctionMemUsages) {
+ expectedTotal += expectedFunctionMemUsages[func];
+ }
+ checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "executionStats");
+ checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "allPlansExecution");
+
+ // Test that a window which also looks behind is able to release documents that are no longer
+ // needed, thus reducing the total memory footprint. In this example, only half of the window
+ // will be in memory at any point in time.
+ pipeline = [
+ {
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {runningSum: {$sum: "$_id", window: {documents: [-5, 4]}}}
+ }
+ },
+ ];
+ expectedTotal = (windowSize / 2) * docSize;
for (let func in expectedFunctionMemUsages) {
expectedTotal += expectedFunctionMemUsages[func];
}
+
checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "executionStats");
checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotal, "allPlansExecution");
@@ -202,9 +217,7 @@ function checkExplainResult(pipeline, expectedFunctionMemUsages, expectedTotalMe
// large string.
const expectedFunctionMemUsages = {pushArray: 1024 * maxDocsInWindow * 2};
- // TODO SERVER-55786: Fix memory tracking so that the PartitionIterator max memory is correctly
- // recorded, remove the '-1' in this line.
- let expectedTotal = (maxDocsInWindow - 1) * docSize;
+ let expectedTotal = maxDocsInWindow * docSize;
for (let func in expectedFunctionMemUsages) {
expectedTotal += expectedFunctionMemUsages[func];
}
diff --git a/jstests/aggregation/sources/setWindowFields/memory_limit.js b/jstests/aggregation/sources/setWindowFields/memory_limit.js
index 9048f2c18ac..b18678df911 100644
--- a/jstests/aggregation/sources/setWindowFields/memory_limit.js
+++ b/jstests/aggregation/sources/setWindowFields/memory_limit.js
@@ -27,8 +27,9 @@ setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
1200);
// Create a collection with enough documents in a single partition to go over the memory limit.
-for (let i = 0; i < 10; i++) {
- coll.insert({_id: i, partitionKey: 1, str: "str"});
+const docsPerPartition = 10;
+for (let i = 0; i < docsPerPartition; i++) {
+ assert.commandWorked(coll.insert({_id: i, partitionKey: 1, largeStr: Array(1025).toString()}));
}
assert.commandFailedWithCode(coll.runCommand({
@@ -38,24 +39,29 @@ assert.commandFailedWithCode(coll.runCommand({
}),
5643011);
-// The same query passes with a higher memory limit.
+// The same query passes with a higher memory limit. Note that the amount of memory consumed by the
+// stage is roughly double the size of the documents since each document has an internal cache.
+const perDocSize = 1200;
setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
"internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
- 3150);
+ (perDocSize * docsPerPartition * 3) + 1024);
assert.commandWorked(coll.runCommand({
aggregate: coll.getName(),
- pipeline: [{$setWindowFields: {sortBy: {partitionKey: 1}, output: {val: {$sum: "$_id"}}}}],
+ pipeline: [{$setWindowFields: {sortBy: {partitionKey: 1}, output: {val: {$sum: "$largeStr"}}}}],
cursor: {}
}));
// The query passes with multiple partitions of the same size.
-for (let i = 0; i < 10; i++) {
- coll.insert({_id: i, partitionKey: 2, str: "str"});
+for (let i = docsPerPartition; i < docsPerPartition * 2; i++) {
+ assert.commandWorked(coll.insert({_id: i, partitionKey: 2, largeStr: Array(1025).toString()}));
}
assert.commandWorked(coll.runCommand({
aggregate: coll.getName(),
pipeline: [{
- $setWindowFields:
- {sortBy: {partitionKey: 1}, partitionBy: "$partitionKey", output: {val: {$sum: "$_id"}}}
+ $setWindowFields: {
+ sortBy: {partitionKey: 1},
+ partitionBy: "$partitionKey",
+ output: {val: {$sum: "$largeStr"}}
+ }
}],
cursor: {}
}));
@@ -67,7 +73,7 @@ assert.commandFailedWithCode(coll.runCommand({
$setWindowFields: {
sortBy: {partitionKey: 1},
partitionBy: "$partitionKey",
- output: {val: {$push: "$_id", window: {documents: [-9, 9]}}}
+ output: {val: {$max: "$largeStr", window: {documents: [-9, 9]}}}
}
}],
cursor: {}
diff --git a/jstests/aggregation/sources/setWindowFields/spill_to_disk.js b/jstests/aggregation/sources/setWindowFields/spill_to_disk.js
index bde78c1dade..0e7bbadde59 100644
--- a/jstests/aggregation/sources/setWindowFields/spill_to_disk.js
+++ b/jstests/aggregation/sources/setWindowFields/spill_to_disk.js
@@ -110,42 +110,40 @@ for (let i = 0; i < results.length; i++) {
}
}
checkProfilerForDiskWrite(db);
-// Test that an explain that executes the query reports usedDisk correctly.
-let explainPipeline = [
- {
- $setWindowFields: {
- partitionBy: "$partition",
- sortBy: {partition: 1},
- output: {arr: {$sum: "$val", window: {documents: [-21, 21]}}}
- }
- },
- {$sort: {_id: 1}}
-];
// We don't execute setWindowFields in a sharded explain.
if (!FixtureHelpers.isMongos(db)) {
+ // Test that an explain that executes the query reports usedDisk correctly.
+ let explainPipeline = [
+ {
+ $setWindowFields: {
+ partitionBy: "$partition",
+ sortBy: {partition: 1},
+ output: {arr: {$sum: "$val", window: {documents: [-21, 21]}}}
+ }
+ },
+ {$sort: {_id: 1}}
+ ];
+
let stages = getAggPlanStages(
coll.explain("allPlansExecution").aggregate(explainPipeline, {allowDiskUse: true}),
"$_internalSetWindowFields");
assert(stages[0]["usedDisk"], stages);
-}
-setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
- "internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
- avgDocSize * largePartitionSize * 2);
-explainPipeline = [
- {
- $setWindowFields: {
- partitionBy: "$partition",
- sortBy: {partition: 1},
- output: {arr: {$sum: "$val", window: {documents: [0, 0]}}}
- }
- },
- {$sort: {_id: 1}}
-];
+ setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
+ "internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
+ avgDocSize * largePartitionSize * 2);
+ explainPipeline = [
+ {
+ $setWindowFields: {
+ partitionBy: "$partition",
+ sortBy: {partition: 1},
+ output: {arr: {$sum: "$val", window: {documents: [0, 0]}}}
+ }
+ },
+ {$sort: {_id: 1}}
+ ];
-// We don't execute setWindowFields in a sharded explain.
-if (!FixtureHelpers.isMongos(db)) {
- let stages = getAggPlanStages(
+ stages = getAggPlanStages(
coll.explain("allPlansExecution").aggregate(explainPipeline, {allowDiskUse: true}),
"$_internalSetWindowFields");
assert(!stages[0]["usedDisk"], stages);
@@ -185,7 +183,7 @@ for (let i = 0; i < results.length; i++) {
// $push uses about ~950 to store all the values in the second partition.
setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
"internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
- 750);
+ avgDocSize * 2);
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [
diff --git a/src/mongo/db/exec/document_value/document.cpp b/src/mongo/db/exec/document_value/document.cpp
index 3ef9249fe2c..54f7f22c433 100644
--- a/src/mongo/db/exec/document_value/document.cpp
+++ b/src/mongo/db/exec/document_value/document.cpp
@@ -362,6 +362,14 @@ void DocumentStorage::reset(const BSONObj& bson, bool stripMetadata) {
_metadataFields = DocumentMetadataFields{};
}
+void DocumentStorage::fillCache() const {
+ for (DocumentStorageIterator it = iterator(); !it.atEnd(); it.advance()) {
+ // Retrieve the value and force it to be cached.
+ if (it->val.getType() == BSONType::Object)
+ it->val.getDocument().fillCache();
+ }
+}
+
void DocumentStorage::loadLazyMetadata() const {
if (_haveLazyLoadedMetadata) {
return;
diff --git a/src/mongo/db/exec/document_value/document.h b/src/mongo/db/exec/document_value/document.h
index 68473c041e6..36c4106357d 100644
--- a/src/mongo/db/exec/document_value/document.h
+++ b/src/mongo/db/exec/document_value/document.h
@@ -130,7 +130,12 @@ public:
Document& operator=(const Document&) = default;
Document& operator=(Document&&) = default;
- /// Look up a field by key name. Returns Value() if no such field. O(1)
+ /**
+ * Look up a field by key name. Returns Value() if no such field. O(1).
+ *
+ * Note that this method does *not* traverse nested documents and arrays, use getNestedField()
+ * instead.
+ */
const Value operator[](StringData key) const {
return getField(key);
}
@@ -234,6 +239,13 @@ public:
return out << doc.toString();
}
+ /**
+ * Populates the internal cache by recursively walking the underlying BSON.
+ */
+ void fillCache() const {
+ storage().fillCache();
+ }
+
/** Calculate a hash value.
*
* Meant to be used to create composite hashes suitable for
diff --git a/src/mongo/db/exec/document_value/document_internal.h b/src/mongo/db/exec/document_value/document_internal.h
index d51915ef9c0..9be057fb522 100644
--- a/src/mongo/db/exec/document_value/document_internal.h
+++ b/src/mongo/db/exec/document_value/document_internal.h
@@ -303,6 +303,11 @@ public:
void reset(const BSONObj& bson, bool stripMetadata);
+ /**
+ * Populates the cache by recursively walking the underlying BSON.
+ */
+ void fillCache() const;
+
static const DocumentStorage& emptyDoc() {
return kEmptyDoc;
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 992797883f9..57e731d200b 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -472,6 +472,7 @@ env.CppUnitTest(
'granularity_rounder_powers_of_two_test.cpp',
'granularity_rounder_preferred_numbers_test.cpp',
'lookup_set_cache_test.cpp',
+ 'memory_usage_tracker_test.cpp',
'pipeline_metadata_tree_test.cpp',
'pipeline_test.cpp',
'resharding_initial_split_policy_test.cpp',
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index a92a05386c5..303db0418b9 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -135,16 +135,18 @@ const char* DocumentSourceGroup::getSourceName() const {
bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory() {
if (!_memoryTracker._allowDiskUse &&
- (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes)) {
+ (_memoryTracker.currentMemoryBytes() >
+ static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes))) {
freeMemory();
}
- if (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes) {
+ if (_memoryTracker.currentMemoryBytes() >
+ static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes)) {
uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
"Exceeded memory limit for $group, but didn't allow external sort."
" Pass allowDiskUse:true to opt in.",
_memoryTracker._allowDiskUse);
- _memoryTracker.set(0);
+ _memoryTracker.resetCurrent();
return true;
}
return false;
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
index cbf6509e5c2..b4c3a3d7e60 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
@@ -336,8 +336,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::crea
void DocumentSourceInternalSetWindowFields::initialize() {
for (auto& wfs : _outputFields) {
_executableOutputs[wfs.fieldName] =
- WindowFunctionExec::create(pExpCtx.get(), &_iterator, wfs, _sortBy);
- _memoryTracker.set(wfs.fieldName, _executableOutputs[wfs.fieldName]->getApproximateSize());
+ WindowFunctionExec::create(pExpCtx.get(), &_iterator, wfs, _sortBy, &_memoryTracker);
}
_init = true;
}
@@ -436,8 +435,6 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext()
return DocumentSource::GetNextResult::makeEOF();
auto curDoc = _iterator.current();
- _memoryTracker.setInternal("PartitionIterator", _iterator.getApproximateSize());
-
// The only way we hit this case is if there are no documents, since otherwise _eof will be set.
if (!curDoc) {
_eof = true;
@@ -456,19 +453,20 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext()
throw;
}
- // Update the memory usage for this function after getNext().
- _memoryTracker.set(fieldName, function->getApproximateSize());
- // Account for the additional memory in the iterator cache.
- _memoryTracker.setInternal("PartitionIterator", _iterator.getApproximateSize());
- if (_memoryTracker.currentMemoryBytes() >= _memoryTracker._maxAllowedMemoryUsageBytes &&
+ if (_memoryTracker.currentMemoryBytes() >=
+ static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes) &&
_memoryTracker._allowDiskUse) {
// Attempt to spill where possible.
_iterator.spillToDisk();
- _memoryTracker.setInternal("PartitionIterator", _iterator.getApproximateSize());
}
- if (_memoryTracker.currentMemoryBytes() > _memoryTracker._maxAllowedMemoryUsageBytes) {
+ if (_memoryTracker.currentMemoryBytes() >
+ static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes)) {
_iterator.finalize();
- uasserted(5414201, "Exceeded memory limit in DocumentSourceSetWindowFields");
+ uasserted(5414201,
+ str::stream()
+ << "Exceeded memory limit in DocumentSourceSetWindowFields, used "
+ << _memoryTracker.currentMemoryBytes() << " bytes but max allowed is "
+ << _memoryTracker._maxAllowedMemoryUsageBytes);
}
}
@@ -483,6 +481,9 @@ DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext()
for (auto&& [fieldName, function] : _executableOutputs) {
function->reset();
}
+
+ // Account for the memory in the iterator for the new partition.
+ _memoryTracker.set(_iterator.getApproximateSize());
break;
case PartitionIterator::AdvanceResult::kEOF:
_eof = true;
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h
index 3d22202601c..f854de92653 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.h
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.h
@@ -112,8 +112,8 @@ public:
_partitionBy(partitionBy),
_sortBy(std::move(sortBy)),
_outputFields(std::move(outputFields)),
- _iterator(expCtx.get(), pSource, std::move(partitionBy), _sortBy, maxMemoryBytes),
- _memoryTracker{pExpCtx->allowDiskUse, maxMemoryBytes} {};
+ _memoryTracker{expCtx->allowDiskUse, maxMemoryBytes},
+ _iterator(expCtx.get(), pSource, &_memoryTracker, std::move(partitionBy), _sortBy){};
GetModPathsReturn getModifiedPaths() const final {
std::set<std::string> outputPaths;
@@ -185,9 +185,9 @@ private:
boost::optional<boost::intrusive_ptr<Expression>> _partitionBy;
boost::optional<SortPattern> _sortBy;
std::vector<WindowFunctionStatement> _outputFields;
+ MemoryUsageTracker _memoryTracker;
PartitionIterator _iterator;
StringMap<std::unique_ptr<WindowFunctionExec>> _executableOutputs;
- MemoryUsageTracker _memoryTracker;
bool _init = false;
bool _eof = false;
};
diff --git a/src/mongo/db/pipeline/memory_usage_tracker.h b/src/mongo/db/pipeline/memory_usage_tracker.h
index b8aad731b07..4d344b5fb2e 100644
--- a/src/mongo/db/pipeline/memory_usage_tracker.h
+++ b/src/mongo/db/pipeline/memory_usage_tracker.h
@@ -32,6 +32,7 @@
#include <memory>
#include <utility>
+#include "mongo/util/str.h"
#include "mongo/util/string_map.h"
namespace mongo {
@@ -44,18 +45,23 @@ class MemoryUsageTracker {
public:
class PerFunctionMemoryTracker {
public:
- PerFunctionMemoryTracker() = default;
-
- void update(int diff) {
- _currentMemoryBytes += diff;
- if (_currentMemoryBytes > _maxMemoryBytes)
- _maxMemoryBytes = _currentMemoryBytes;
+ explicit PerFunctionMemoryTracker(MemoryUsageTracker* base) : base(base){};
+ PerFunctionMemoryTracker() = delete;
+
+ void update(long long diff) {
+ tassert(5578603,
+ str::stream() << "Underflow on memory tracking, attempting to add " << diff
+ << " but only " << _currentMemoryBytes << " available",
+ diff >= 0 || _currentMemoryBytes >= std::abs(diff));
+ set(_currentMemoryBytes + diff);
}
- void set(uint64_t total) {
+ void set(long long total) {
if (total > _maxMemoryBytes)
_maxMemoryBytes = total;
+ long long prior = _currentMemoryBytes;
_currentMemoryBytes = total;
+ base->update(total - prior);
}
auto currentMemoryBytes() const {
@@ -66,38 +72,30 @@ public:
return _maxMemoryBytes;
}
+ MemoryUsageTracker* base = nullptr;
+
private:
// Maximum memory consumption thus far observed for this function.
- uint64_t _maxMemoryBytes = 0;
+ long long _maxMemoryBytes = 0;
// Tracks the current memory footprint.
- uint64_t _currentMemoryBytes = 0;
+ long long _currentMemoryBytes = 0;
};
- MemoryUsageTracker(bool allowDiskUse, size_t maxMemoryUsageBytes)
+ MemoryUsageTracker(bool allowDiskUse = false, size_t maxMemoryUsageBytes = 0)
: _allowDiskUse(allowDiskUse), _maxAllowedMemoryUsageBytes(maxMemoryUsageBytes) {}
/**
- * Sets the new total for 'functionName', and updates the current total memory usage.
+ * Sets the new total for 'name', and updates the current total memory usage.
*/
- void set(StringData functionName, uint64_t total) {
- auto oldFuncUsage = _functionMemoryTracker[functionName].currentMemoryBytes();
- _functionMemoryTracker[functionName].set(total);
- update(total - oldFuncUsage);
- }
-
- void setInternal(StringData functionName, uint64_t total) {
- auto oldFuncUsage = _internalMemoryTracker[functionName].currentMemoryBytes();
- _internalMemoryTracker[functionName].set(total);
- _memoryUsageBytes += total - oldFuncUsage;
- if (_memoryUsageBytes > _maxMemoryUsageBytes) {
- _maxMemoryUsageBytes = _memoryUsageBytes;
- }
+ void set(StringData name, long long total) {
+ _functionMemoryTracker.try_emplace(name, this);
+ _functionMemoryTracker.find(name)->second.set(total);
}
/**
* Sets the new current memory usage in bytes.
*/
- void set(uint64_t total) {
+ void set(long long total) {
_memoryUsageBytes = total;
if (_memoryUsageBytes > _maxMemoryUsageBytes) {
_maxMemoryUsageBytes = _memoryUsageBytes;
@@ -109,13 +107,10 @@ public:
* current value for maximum total memory usage.
*/
void resetCurrent() {
- _memoryUsageBytes = 0;
for (auto& [_, funcTracker] : _functionMemoryTracker) {
funcTracker.set(0);
}
- for (auto& [_, funcTracker] : _internalMemoryTracker) {
- funcTracker.set(0);
- }
+ _memoryUsageBytes = 0;
}
/**
@@ -127,36 +122,35 @@ public:
<< name,
_functionMemoryTracker.find(name) != _functionMemoryTracker.end());
return _functionMemoryTracker.at(name);
- MONGO_UNREACHABLE;
}
- auto readInternal(StringData name) const {
- tassert(5643009,
- str::stream() << "Invalid call to memory usage tracker, could not find function "
- << name,
- _internalMemoryTracker.find(name) != _internalMemoryTracker.end());
- return _internalMemoryTracker.at(name);
+ /**
+ * Non-const version, creates a new element if one doesn't exist and returns a reference to it.
+ */
+ PerFunctionMemoryTracker& operator[](StringData name) {
+ _functionMemoryTracker.try_emplace(name, this);
+ return _functionMemoryTracker.at(name);
}
/**
- * Updates the memory usage for 'functionName' by adding 'diff' to the current memory usage for
+ * Updates the memory usage for 'name' by adding 'diff' to the current memory usage for
* that function. Also updates the total memory usage.
*/
- void update(StringData name, int diff) {
- _functionMemoryTracker[name].update(diff);
- update(diff);
+ void update(StringData name, long long diff) {
+ _functionMemoryTracker.try_emplace(name, this);
+ _functionMemoryTracker.find(name)->second.update(diff);
}
/**
* Updates total memory usage.
*/
- void update(int diff) {
+ void update(long long diff) {
+ tassert(5578602,
+ str::stream() << "Underflow on memory tracking, attempting to add " << diff
+ << " but only " << _memoryUsageBytes << " available",
+ diff >= 0 || (int)_memoryUsageBytes >= -1 * diff);
set(_memoryUsageBytes + diff);
}
- void updateInternal(StringData name, int diff) {
- _internalMemoryTracker[name].update(diff);
- _memoryUsageBytes += diff;
- }
auto currentMemoryBytes() const {
return _memoryUsageBytes;
@@ -170,14 +164,11 @@ public:
private:
// Tracks current memory used.
- size_t _memoryUsageBytes = 0;
- size_t _maxMemoryUsageBytes = 0;
+ long long _memoryUsageBytes = 0;
+ long long _maxMemoryUsageBytes = 0;
// Tracks memory consumption per function using the output field name as a key.
StringMap<PerFunctionMemoryTracker> _functionMemoryTracker;
- // Tracks memory consumption of internal values so there is no worry of colliding with a user
- // field name.
- StringMap<PerFunctionMemoryTracker> _internalMemoryTracker;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/memory_usage_tracker_test.cpp b/src/mongo/db/pipeline/memory_usage_tracker_test.cpp
new file mode 100644
index 00000000000..f0514d86758
--- /dev/null
+++ b/src/mongo/db/pipeline/memory_usage_tracker_test.cpp
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/memory_usage_tracker.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class MemoryUsageTrackerTest : public unittest::Test {
+public:
+ static constexpr auto kDefaultMax = 1 * 1024; // 1KB max.
+ MemoryUsageTrackerTest()
+ : _tracker(false /** allowDiskUse */, kDefaultMax), _funcTracker(&_tracker) {}
+
+
+ MemoryUsageTracker _tracker;
+ MemoryUsageTracker::PerFunctionMemoryTracker _funcTracker;
+};
+
+TEST_F(MemoryUsageTrackerTest, SetUpdatesCurrentAndMax) {
+ _tracker.set(50LL);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 50LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 50LL);
+
+ _tracker.set(_tracker.currentMemoryBytes() + 50);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 100LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 100LL);
+}
+
+TEST_F(MemoryUsageTrackerTest, SetFunctionUsageUpdatesGlobal) {
+ _tracker.set(50LL);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 50LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 50LL);
+
+ // 50 global + 50 _funcTracker.
+ _funcTracker.set(50);
+ ASSERT_EQ(_funcTracker.currentMemoryBytes(), 50LL);
+ ASSERT_EQ(_funcTracker.maxMemoryBytes(), 50LL);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 100LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 100LL);
+
+ // New tracker adds another 50, 150 total.
+ _tracker.set("newTracker", 50);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 150LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 150LL);
+
+ // Lower usage of function tracker is reflected in global.
+ _tracker.set("newTracker", 0);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 100LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 150LL);
+}
+
+TEST_F(MemoryUsageTrackerTest, UpdateUsageUpdatesGlobal) {
+ _tracker.set(50LL);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 50LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 50LL);
+
+ // Add another 50 to the global, 100 total.
+ _tracker.update(50);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 100LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 100LL);
+
+ // Function tracker adds another 50, 150 total.
+ _funcTracker.update(50);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 150LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 150LL);
+
+ // Lower usage of function tracker is reflected in global.
+ _funcTracker.update(-25);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 125LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 150LL);
+}
+
+DEATH_TEST_F(MemoryUsageTrackerTest,
+ UpdateGlobalToNegativeIsDisallowed,
+ "Underflow on memory tracking") {
+ _tracker.set(50LL);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 50LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 50LL);
+
+ _tracker.update(-100);
+}
+
+DEATH_TEST_F(MemoryUsageTrackerTest,
+ UpdateFunctionUsageToNegativeIsDisallowed,
+ "Underflow on memory tracking") {
+ _funcTracker.set(50LL);
+ ASSERT_EQ(_tracker.currentMemoryBytes(), 50LL);
+ ASSERT_EQ(_tracker.maxMemoryBytes(), 50LL);
+
+ _funcTracker.update(-100);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp
index 07e9d55bbf3..59afdf4d94a 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp
@@ -80,16 +80,16 @@ static optional<boost::intrusive_ptr<ExpressionFieldPath>> exprFromSort(
PartitionIterator::PartitionIterator(ExpressionContext* expCtx,
DocumentSource* source,
+ MemoryUsageTracker* tracker,
optional<boost::intrusive_ptr<Expression>> partitionExpr,
- const optional<SortPattern>& sortPattern,
- size_t maxMem)
+ const optional<SortPattern>& sortPattern)
: _expCtx(expCtx),
_source(source),
_partitionExpr(std::move(partitionExpr)),
_sortExpr(exprFromSort(_expCtx, sortPattern)),
- _state(IteratorState::kNotInitialized) {
- _cache = std::make_unique<SpillableCache>(_expCtx, maxMem);
-}
+ _state(IteratorState::kNotInitialized),
+ _cache(std::make_unique<SpillableCache>(_expCtx, tracker)),
+ _tracker(tracker) {}
optional<Document> PartitionIterator::operator[](int index) {
auto docDesired = _indexOfCurrentInPartition + index;
@@ -471,6 +471,11 @@ void PartitionIterator::getNextDocument() {
return;
auto doc = getNextRes.releaseDocument();
+
+ // Greedily populate the internal document cache to enable easier memory tracking versus
+ // detecting the changing document size during execution of each function.
+ doc.fillCache();
+
if (_partitionExpr) {
// Because partitioning is achieved by sorting in $setWindowFields, and missing fields and
// nulls are considered equivalent in sorting, documents with missing fields and nulls may
@@ -486,9 +491,11 @@ void PartitionIterator::getNextDocument() {
!curKey.isArray());
if (_state == IteratorState::kNotInitialized) {
_nextPartition = NextPartitionState{std::move(doc), std::move(curKey)};
+ _tracker->update(getNextPartitionStateSize());
advanceToNextPartition();
} else if (_expCtx->getValueComparator().compare(curKey, _partitionKey) != 0) {
_nextPartition = NextPartitionState{std::move(doc), std::move(curKey)};
+ _tracker->update(getNextPartitionStateSize());
_state = IteratorState::kAwaitingAdvanceToNext;
} else {
_cache->addDocument(std::move(doc));
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h
index f5fe2883d35..52bf2547582 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.h
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.h
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/memory_usage_tracker.h"
#include "mongo/db/pipeline/window_function/spillable_cache.h"
#include "mongo/db/pipeline/window_function/window_bounds.h"
#include "mongo/db/query/query_knobs_gen.h"
@@ -48,17 +49,14 @@ namespace mongo {
*
* The 'sortPattern' is used for resolving range-based and time-based bounds, in 'getEndpoints()'.
*
- * 'maxMem' is the maximum amount of memory (in bytes) the PartitionIterator is allowed to use.
- * Depending on whether disk use is allowed when the memory limit is hit it will either spill to
- * disk or throw.
*/
class PartitionIterator {
public:
PartitionIterator(ExpressionContext* expCtx,
DocumentSource* source,
+ MemoryUsageTracker* tracker,
boost::optional<boost::intrusive_ptr<Expression>> partitionExpr,
- const boost::optional<SortPattern>& sortPattern,
- size_t maxMem);
+ const boost::optional<SortPattern>& sortPattern);
using SlotId = unsigned int;
SlotId newSlot() {
@@ -245,6 +243,10 @@ private:
"Invalid call to PartitionIterator::advanceToNextPartition",
_nextPartition != boost::none);
resetCache();
+ // The memory accounted for in the _nextPartition will be moved to the spillable cache, so
+ // subtract it out here.
+ _tracker->update(-1 * getNextPartitionStateSize());
+
// Cache is cleared, and we are moving the _nextPartition value to different positions.
_cache->addDocument(std::move(_nextPartition->_doc));
_partitionKey = std::move(_nextPartition->_partitionKey);
@@ -308,6 +310,8 @@ private:
// The actual cache of the PartitionIterator. Holds documents and spills documents that exceed
// the memory limit given to PartitionIterator to disk. Behaves like a deque.
std::unique_ptr<SpillableCache> _cache = nullptr;
+
+ MemoryUsageTracker* _tracker;
};
/**
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
index 855fffdf2c5..de702eb112f 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
+++ b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
@@ -47,20 +47,28 @@ public:
auto makeDefaultAccessor(
boost::intrusive_ptr<DocumentSourceMock> mock,
boost::optional<boost::intrusive_ptr<Expression>> partExpr = boost::none) {
- _iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- mock.get(),
- partExpr,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ if (!_iter)
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), mock.get(), &_tracker, partExpr, boost::none);
return PartitionAccessor(_iter.get(), PartitionAccessor::Policy::kDefaultSequential);
}
+ auto makeEndpointAccessor(
+ boost::intrusive_ptr<DocumentSourceMock> mock,
+ boost::optional<boost::intrusive_ptr<Expression>> partExpr = boost::none) {
+ if (!_iter)
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), mock.get(), &_tracker, partExpr, boost::none);
+ return PartitionAccessor(_iter.get(), PartitionAccessor::Policy::kEndpoints);
+ }
+
auto advance() {
invariant(_iter);
return _iter->advance();
}
-private:
+protected:
+ MemoryUsageTracker _tracker{false, 100 * 1024 * 1024 /* default memory limit */};
std::unique_ptr<PartitionIterator> _iter;
};
@@ -276,12 +284,7 @@ TEST_F(PartitionIteratorTest, OutsideOfPartitionAccessShouldNotTassert) {
const auto docs =
std::deque<DocumentSource::GetNextResult>{Document{{"a", 1}}, Document{{"a", 2}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
- auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
+ auto accessor = makeDefaultAccessor(mock, boost::none);
// Test that an accessor that attempts to read off the end of the partition returns boost::none
// instead of tassert'ing.
@@ -293,16 +296,11 @@ DEATH_TEST_F(PartitionIteratorTest, SingleConsumerDefaultPolicy, "Requested expi
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
- auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
+ auto accessor = makeDefaultAccessor(mock, boost::none);
// Access the first document, which marks it as expired.
ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *accessor[0]);
// Advance the iterator which frees the first expired document.
- partIter.advance();
+ advance();
// Attempting to access the first doc results in a tripwire assertion.
ASSERT_THROWS_CODE(accessor[-1], AssertionException, 5643005);
}
@@ -311,25 +309,18 @@ DEATH_TEST_F(PartitionIteratorTest, MultipleConsumerDefaultPolicy, "Requested ex
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
- auto laggingAccessor =
- PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
- auto leadingAccessor =
- PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
+ auto laggingAccessor = makeDefaultAccessor(mock, boost::none);
+ auto leadingAccessor = makeDefaultAccessor(mock, boost::none);
// The lagging accessor is referencing 1 doc behind current, and leading is 1 doc ahead.
ASSERT_FALSE(laggingAccessor[-1]);
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *leadingAccessor[1]);
- partIter.advance();
+ advance();
// At this point, no documents are expired.
ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *laggingAccessor[-1]);
ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *leadingAccessor[1]);
- partIter.advance();
+ advance();
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *laggingAccessor[-1]);
// The leading accessor has fallen off the right side of the partition.
@@ -344,12 +335,7 @@ DEATH_TEST_F(PartitionIteratorTest, SingleConsumerEndpointPolicy, "Requested exp
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
- auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
+ auto accessor = makeEndpointAccessor(mock, boost::none);
// Mock a window with documents [1, 2].
auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(1 << 2)),
SortPattern(BSON("a" << 1), getExpCtx()),
@@ -358,35 +344,30 @@ DEATH_TEST_F(PartitionIteratorTest, SingleConsumerEndpointPolicy, "Requested exp
// below the lower bound are not needed.
auto endpoints = accessor.getEndpoints(bounds);
// Advance the iterator which frees the first expired document.
- partIter.advance();
+ advance();
// Advancing again does not trigger any expiration since there has not been a subsequent call to
// getEndpoints().
- partIter.advance();
+ advance();
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *accessor[-1]);
endpoints = accessor.getEndpoints(bounds);
// Now the second document, currently at index 0 in the cache, will be released.
- partIter.advance();
- ASSERT_THROWS_CODE(accessor[-1], AssertionException, 5643005);
- ASSERT_THROWS_CODE(accessor[-2], AssertionException, 5643005);
+ advance();
+ ASSERT_THROWS_CODE(accessor[-1], AssertionException, 5371202);
+ ASSERT_THROWS_CODE(accessor[-2], AssertionException, 5371202);
}
DEATH_TEST_F(PartitionIteratorTest, MultipleConsumerEndpointPolicy, "Requested expired document") {
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
// Create two endpoint accessors, one at [-1, 0] and another at [0, 1]. Since the first one may
// access the document at (current - 1), the only expiration that can happen on advance() would
// be (newCurrent - 2).
- auto lookBehindAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
- auto lookAheadAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
+ auto lookBehindAccessor = makeEndpointAccessor(mock, boost::none);
+ auto lookAheadAccessor = makeEndpointAccessor(mock, boost::none);
auto negBounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(-1 << 0)),
SortPattern(BSON("a" << 1), getExpCtx()),
getExpCtx().get());
@@ -401,7 +382,7 @@ DEATH_TEST_F(PartitionIteratorTest, MultipleConsumerEndpointPolicy, "Requested e
ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *lookAheadAccessor[endpoints->first]);
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *lookAheadAccessor[endpoints->second]);
// Advance the iterator which does not free any documents.
- partIter.advance();
+ advance();
endpoints = lookBehindAccessor.getEndpoints(negBounds);
ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *lookBehindAccessor[endpoints->first]);
@@ -411,7 +392,7 @@ DEATH_TEST_F(PartitionIteratorTest, MultipleConsumerEndpointPolicy, "Requested e
ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *lookAheadAccessor[endpoints->second]);
// Advance again, the current document is now {a: 3}.
- partIter.advance();
+ advance();
endpoints = lookBehindAccessor.getEndpoints(negBounds);
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *lookBehindAccessor[endpoints->first]);
ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *lookBehindAccessor[endpoints->second]);
@@ -421,7 +402,7 @@ DEATH_TEST_F(PartitionIteratorTest, MultipleConsumerEndpointPolicy, "Requested e
// Since both accessors are done with document 0, the next advance will free it but keep around
// the other docs.
- partIter.advance();
+ advance();
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *lookBehindAccessor[-2]);
ASSERT_THROWS_CODE(lookBehindAccessor[-3], AssertionException, 5643005);
}
@@ -432,11 +413,8 @@ DEATH_TEST_F(PartitionIteratorTest,
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ auto partIter =
+ PartitionIterator(getExpCtx().get(), mock.get(), &_tracker, boost::none, boost::none);
auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kRightEndpoint);
// Use a window of 'documents: [-2, -1]'.
auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(-2 << -1)),
@@ -470,14 +448,8 @@ DEATH_TEST_F(PartitionIteratorTest, MixedPolicy, "Requested expired document") {
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(),
- mock.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
- auto endpointAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
- auto defaultAccessor =
- PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
+ auto endpointAccessor = makeEndpointAccessor(mock, boost::none);
+ auto defaultAccessor = makeDefaultAccessor(mock, boost::none);
// Mock a window with documents [1, 2].
auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(1 << 2)),
SortPattern(BSON("a" << 1), getExpCtx()),
@@ -490,10 +462,10 @@ DEATH_TEST_F(PartitionIteratorTest, MixedPolicy, "Requested expired document") {
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *defaultAccessor[1]);
// Advance the iterator which frees the first expired document.
- partIter.advance();
+ advance();
// Advance again to get the iterator to document {a: 3}.
- partIter.advance();
+ advance();
// Adjust the default accessor to refer back to the document {a: 2}.
ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *defaultAccessor[-1]);
// Keep the same endpoint accessor, which will only include the last document.
@@ -503,12 +475,102 @@ DEATH_TEST_F(PartitionIteratorTest, MixedPolicy, "Requested expired document") {
// Since the default accessor has not read {a: 3} yet, it won't be released after another
// advance.
- partIter.advance();
+ advance();
ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *defaultAccessor[-1]);
// The iterator is currently at {a: 4}, with {a: 1} and {a: 2} both being released.
ASSERT_THROWS_CODE(defaultAccessor[-2], AssertionException, 5643005);
}
+TEST_F(PartitionIteratorTest, MemoryUsageAccountsForDocumentIteratorCache) {
+ std::string largeStr(1024, 'x');
+ auto bsonDoc = BSON("a" << largeStr);
+ const auto docs =
+ std::deque<DocumentSource::GetNextResult>{Document(bsonDoc), Document(bsonDoc)};
+ const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
+
+ [[maybe_unused]] auto accessor = makeDefaultAccessor(mock, boost::none);
+ size_t initialDocSize = docs[0].getDocument().getApproximateSize();
+
+ // Pull in the first document, and verify the reported size of the iterator is roughly double
+ // the size of the document.
+ ASSERT_DOCUMENT_EQ(*_iter->current(), docs[0].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), initialDocSize * 2);
+ ASSERT_LT(_iter->getApproximateSize(), initialDocSize * 2 + 500);
+
+ // Pull in the second document. Both docs remain in the cache so the reported memory should
+ // include both.
+ advance();
+ ASSERT_DOCUMENT_EQ(*_iter->current(), docs[1].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), initialDocSize * 2 * 2);
+ ASSERT_LT(_iter->getApproximateSize(), initialDocSize * 2 * 2 + 500);
+}
+
+TEST_F(PartitionIteratorTest, MemoryUsageAccountsForArraysInDocumentIteratorCache) {
+ std::string largeStr(1024, 'x');
+ auto bsonDoc = BSON("arr" << BSON_ARRAY(BSON("subObj" << largeStr)));
+ const auto docs =
+ std::deque<DocumentSource::GetNextResult>{Document(bsonDoc), Document(bsonDoc)};
+ const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
+
+ [[maybe_unused]] auto accessor = makeDefaultAccessor(mock, boost::none);
+ size_t initialDocSize = docs[0].getDocument().getApproximateSize();
+
+ // Pull in the first document, and verify the reported size of the iterator is roughly
+ // triple the size of the document. The reason for this is that 'largeStr' is cached twice; once
+ // for the 'arr' element and once for the nested 'subObj' element.
+ ASSERT_DOCUMENT_EQ(*_iter->current(), docs[0].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), initialDocSize * 2);
+ ASSERT_LT(_iter->getApproximateSize(), initialDocSize * 2 + 1024);
+
+ // Pull in the second document. Both docs remain in the cache so the reported memory should
+ // include both.
+ advance();
+ ASSERT_DOCUMENT_EQ(*_iter->current(), docs[1].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), (initialDocSize * 2) * 2);
+ ASSERT_LT(_iter->getApproximateSize(), (initialDocSize * 2) * 2 + 1024);
+}
+
+TEST_F(PartitionIteratorTest, MemoryUsageAccountsForNestedObjInDocumentIteratorCache) {
+ std::string largeStr(1024, 'x');
+ auto bsonDoc = BSON(
+ "obj" << BSON("subObj" << BSON("subObjSubObj" << largeStr) << "uncachedSub" << largeStr));
+ const auto docs = std::deque<DocumentSource::GetNextResult>{Document(bsonDoc)};
+ const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
+
+ [[maybe_unused]] auto accessor = makeDefaultAccessor(mock, boost::none);
+ size_t initialDocSize = docs[0].getDocument().getApproximateSize();
+
+ // Pull in the first document, and verify the reported size. TODO SERVER-57011: The approximate
+ // size should not double count the nested strings.
+ ASSERT_DOCUMENT_EQ(*_iter->current(), docs[0].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), initialDocSize * 3);
+ ASSERT_LT(_iter->getApproximateSize(), initialDocSize * 4);
+}
+
+TEST_F(PartitionIteratorTest, MemoryUsageAccountsForReleasedDocuments) {
+ std::string largeStr(1000, 'x');
+ auto bsonDoc = BSON("a" << largeStr);
+ const auto docs =
+ std::deque<DocumentSource::GetNextResult>{Document(bsonDoc), Document(bsonDoc)};
+ const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
+
+ auto accessor = makeDefaultAccessor(mock, boost::none);
+ size_t initialDocSize = docs[0].getDocument().getApproximateSize();
+
+ // Pull in the first document, and verify the reported size of the iterator is roughly double
+ // the size of the document.
+ ASSERT_DOCUMENT_EQ(*accessor[0], docs[0].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), initialDocSize * 2);
+ ASSERT_LT(_iter->getApproximateSize(), initialDocSize * 2 + 1024);
+
+ // The accessor will have marked the first document as expired, and thus freed on the next call
+ // to advance().
+ advance();
+ ASSERT_DOCUMENT_EQ(*_iter->current(), docs[1].getDocument());
+ ASSERT_GT(_iter->getApproximateSize(), initialDocSize * 2);
+ ASSERT_LT(_iter->getApproximateSize(), initialDocSize * 2 + 1024);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/spillable_cache.cpp b/src/mongo/db/pipeline/window_function/spillable_cache.cpp
index a3d4a15d878..636335200ec 100644
--- a/src/mongo/db/pipeline/window_function/spillable_cache.cpp
+++ b/src/mongo/db/pipeline/window_function/spillable_cache.cpp
@@ -35,12 +35,6 @@
#include "mongo/db/storage/record_data.h"
namespace mongo {
-void SpillableCache::updateMemoryUsage() {
- _memUsed = 0;
- for (const auto& doc : _memCache) {
- _memUsed += doc.getApproximateSize();
- }
-}
bool SpillableCache::isIdInCache(int id) {
tassert(5643005,
@@ -57,15 +51,17 @@ void SpillableCache::verifyInCache(int id) {
isIdInCache(id));
}
void SpillableCache::addDocument(Document input) {
- updateMemoryUsage();
- _memUsed += input.getApproximateSize();
+ _memTracker.update(input.getApproximateSize());
_memCache.emplace_back(std::move(input));
- if (_memUsed >= _maxMem && _expCtx->allowDiskUse) {
+ if (_memTracker.currentMemoryBytes() >=
+ static_cast<long long>(_memTracker.base->_maxAllowedMemoryUsageBytes) &&
+ _expCtx->allowDiskUse) {
spillToDisk();
}
uassert(5643011,
"Exceeded max memory. Set 'allowDiskUse: true' to spill to disk",
- _memUsed < _maxMem);
+ _memTracker.currentMemoryBytes() <
+ static_cast<long long>(_memTracker.base->_maxAllowedMemoryUsageBytes));
++_nextIndex;
}
Document SpillableCache::getDocumentById(int id) {
@@ -76,7 +72,6 @@ Document SpillableCache::getDocumentById(int id) {
return readDocumentFromMemCacheById(id);
}
void SpillableCache::freeUpTo(int id) {
- updateMemoryUsage();
for (int i = _nextFreedIndex; i <= id; ++i) {
verifyInCache(i);
// Deleting is expensive in WT. Only delete in memory documents, documents in the record
@@ -85,8 +80,7 @@ void SpillableCache::freeUpTo(int id) {
tassert(5643010,
"Attempted to remove document from empty memCache in SpillableCache",
_memCache.size() > 0);
- auto docToFree = _memCache.front();
- _memUsed -= docToFree.getApproximateSize();
+ _memTracker.update(-1 * _memCache.front().getApproximateSize());
_memCache.pop_front();
}
++_nextFreedIndex;
@@ -100,7 +94,7 @@ void SpillableCache::clear() {
_diskWrittenIndex = 0;
_nextIndex = 0;
_nextFreedIndex = 0;
- _memUsed = 0;
+ _memTracker.set(0);
}
void SpillableCache::writeBatchToDisk(std::vector<Record>& records) {
@@ -146,7 +140,7 @@ void SpillableCache::spillToDisk() {
++_diskWrittenIndex;
}
_memCache.clear();
- _memUsed = 0;
+ _memTracker.set(0);
if (records.size() == 0) {
return;
}
diff --git a/src/mongo/db/pipeline/window_function/spillable_cache.h b/src/mongo/db/pipeline/window_function/spillable_cache.h
index 7ed7fc8f466..4777baa2145 100644
--- a/src/mongo/db/pipeline/window_function/spillable_cache.h
+++ b/src/mongo/db/pipeline/window_function/spillable_cache.h
@@ -31,6 +31,7 @@
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/memory_usage_tracker.h"
#include "mongo/db/storage/temporary_record_store.h"
namespace mongo {
@@ -43,9 +44,18 @@ namespace mongo {
*/
class SpillableCache {
public:
- SpillableCache(ExpressionContext* expCtx, size_t maxMemToUse)
- : _expCtx(expCtx), _maxMem(maxMemToUse) {}
+ SpillableCache(ExpressionContext* expCtx, MemoryUsageTracker* tracker)
+ : _expCtx(expCtx), _memTracker(tracker) {}
+ /**
+ * Adds 'input' to the in-memory cache and spills to disk if the document size puts us over the
+ * memory limit and spilling is allowed.
+ *
+ * Note that the reported approximate size of 'input' may include the internal Document field
+ * cache along with the underlying BSON size, which can change depending on the access pattern.
+ * This class assumes that the size of the Document does not change from the time that it's
+ * added here until it's freed via freeUpTo().
+ */
void addDocument(Document input);
/**
@@ -83,7 +93,7 @@ public:
}
size_t getApproximateSize() {
- return _memUsed;
+ return _memTracker.currentMemoryBytes();
}
int getNumDocs() {
@@ -122,12 +132,7 @@ private:
Document readDocumentFromMemCacheById(int desired);
void verifyInCache(int desired);
void writeBatchToDisk(std::vector<Record>& records);
- // Document size can change while in the cache depending on what fields are accessed. This
- // function updates '_memUsed' based on changes between calls.
- void updateMemoryUsage();
ExpressionContext* _expCtx;
- size_t _maxMem = 0;
- size_t _memUsed = 0;
std::deque<Document> _memCache;
std::unique_ptr<TemporaryRecordStore> _diskCache = nullptr;
@@ -145,6 +150,8 @@ private:
// Be able to report that disk was used after the cache has been finalized.
bool _usedDisk = false;
+
+ MemoryUsageTracker::PerFunctionMemoryTracker _memTracker;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp b/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp
index 85a410671d7..bacd4eb59b7 100644
--- a/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp
+++ b/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp
@@ -110,7 +110,8 @@ public:
}
std::unique_ptr<SpillableCache> createSpillableCache(size_t maxMem) {
- auto cache = std::make_unique<SpillableCache>(_expCtx.get(), maxMem);
+ _tracker = std::make_unique<MemoryUsageTracker>(false, maxMem);
+ auto cache = std::make_unique<SpillableCache>(_expCtx.get(), _tracker.get());
return cache;
}
@@ -128,6 +129,8 @@ public:
}
boost::intrusive_ptr<ExpressionContext> _expCtx;
+ std::unique_ptr<MemoryUsageTracker> _tracker;
+
// Docs are ~200 each.
std::vector<Document> _docSet;
int _lastIndex = 0;
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.cpp b/src/mongo/db/pipeline/window_function/window_function_exec.cpp
index 7db96ef7e98..00ca373d6a3 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec.cpp
@@ -72,7 +72,8 @@ std::unique_ptr<WindowFunctionExec> translateDocumentWindow(
PartitionIterator* iter,
boost::intrusive_ptr<window_function::Expression> expr,
const boost::optional<SortPattern>& sortBy,
- const WindowBounds::DocumentBased& bounds) {
+ const WindowBounds::DocumentBased& bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker) {
auto inputExpr = translateInputExpression(expr, sortBy);
return stdx::visit(
@@ -80,12 +81,12 @@ std::unique_ptr<WindowFunctionExec> translateDocumentWindow(
[&](const WindowBounds::Unbounded&) -> std::unique_ptr<WindowFunctionExec> {
// A left unbounded window will always be non-removable regardless of the upper
// bound.
- return std::make_unique<WindowFunctionExecNonRemovable<AccumulatorState>>(
- iter, inputExpr, expr->buildAccumulatorOnly(), bounds.upper);
+ return std::make_unique<WindowFunctionExecNonRemovable>(
+ iter, inputExpr, expr->buildAccumulatorOnly(), bounds.upper, memTracker);
},
[&](const auto&) -> std::unique_ptr<WindowFunctionExec> {
return std::make_unique<WindowFunctionExecRemovableDocument>(
- iter, inputExpr, expr->buildRemovable(), bounds);
+ iter, inputExpr, expr->buildRemovable(), bounds, memTracker);
}},
bounds.lower);
}
@@ -93,7 +94,8 @@ std::unique_ptr<WindowFunctionExec> translateDocumentWindow(
std::unique_ptr<mongo::WindowFunctionExec> translateDerivative(
window_function::ExpressionDerivative* expr,
PartitionIterator* iter,
- const boost::optional<SortPattern>& sortBy) {
+ const boost::optional<SortPattern>& sortBy,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker) {
tassert(5490703,
"$derivative requires a 1-field ascending sortBy",
sortBy && sortBy->size() == 1 && !sortBy->begin()->expression &&
@@ -104,7 +106,7 @@ std::unique_ptr<mongo::WindowFunctionExec> translateDerivative(
expr->expCtx()->variablesParseState);
return std::make_unique<WindowFunctionExecDerivative>(
- iter, expr->input(), sortExpr, expr->bounds(), expr->outputUnit());
+ iter, expr->input(), sortExpr, expr->bounds(), expr->outputUnit(), memTracker);
}
@@ -114,20 +116,25 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
ExpressionContext* expCtx,
PartitionIterator* iter,
const WindowFunctionStatement& functionStmt,
- const boost::optional<SortPattern>& sortBy) {
+ const boost::optional<SortPattern>& sortBy,
+ MemoryUsageTracker* memTracker) {
+ MemoryUsageTracker::PerFunctionMemoryTracker& functionMemTracker =
+ (*memTracker)[functionStmt.fieldName];
if (auto expr = dynamic_cast<window_function::ExpressionDerivative*>(functionStmt.expr.get())) {
- return translateDerivative(expr, iter, sortBy);
+ return translateDerivative(expr, iter, sortBy, &functionMemTracker);
} else if (auto expr =
dynamic_cast<window_function::ExpressionFirst*>(functionStmt.expr.get())) {
- return std::make_unique<WindowFunctionExecFirst>(iter, expr->input(), expr->bounds());
+ return std::make_unique<WindowFunctionExecFirst>(
+ iter, expr->input(), expr->bounds(), boost::none, &functionMemTracker);
} else if (auto expr =
dynamic_cast<window_function::ExpressionLast*>(functionStmt.expr.get())) {
- return std::make_unique<WindowFunctionExecLast>(iter, expr->input(), expr->bounds());
+ return std::make_unique<WindowFunctionExecLast>(
+ iter, expr->input(), expr->bounds(), &functionMemTracker);
} else if (auto expr =
dynamic_cast<window_function::ExpressionShift*>(functionStmt.expr.get())) {
return std::make_unique<WindowFunctionExecFirst>(
- iter, expr->input(), expr->bounds(), expr->defaultVal());
+ iter, expr->input(), expr->bounds(), expr->defaultVal(), &functionMemTracker);
}
WindowBounds bounds = functionStmt.expr->bounds();
@@ -135,7 +142,8 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
return stdx::visit(
visit_helper::Overloaded{
[&](const WindowBounds::DocumentBased& docBounds) {
- return translateDocumentWindow(iter, functionStmt.expr, sortBy, docBounds);
+ return translateDocumentWindow(
+ iter, functionStmt.expr, sortBy, docBounds, &functionMemTracker);
},
[&](const WindowBounds::RangeBased& rangeBounds)
-> std::unique_ptr<WindowFunctionExec> {
@@ -157,14 +165,16 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
inputExpr,
std::move(sortByExpr),
functionStmt.expr->buildAccumulatorOnly(),
- bounds);
+ bounds,
+ &functionMemTracker);
} else {
return std::make_unique<WindowFunctionExecRemovableRange>(
iter,
inputExpr,
std::move(sortByExpr),
functionStmt.expr->buildRemovable(),
- bounds);
+ bounds,
+ &functionMemTracker);
}
},
},
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.h b/src/mongo/db/pipeline/window_function/window_function_exec.h
index 28defe7337f..7f4984d90f6 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec.h
@@ -59,7 +59,8 @@ public:
static std::unique_ptr<WindowFunctionExec> create(ExpressionContext* expCtx,
PartitionIterator* iter,
const WindowFunctionStatement& functionStmt,
- const boost::optional<SortPattern>& sortBy);
+ const boost::optional<SortPattern>& sortBy,
+ MemoryUsageTracker* memTracker);
virtual ~WindowFunctionExec() = default;
@@ -73,15 +74,13 @@ public:
*/
virtual void reset() = 0;
- /**
- * Returns how much memory the accumulators or window functions being held are using.
- */
- virtual size_t getApproximateSize() const = 0;
-
protected:
- WindowFunctionExec(PartitionAccessor iter) : _iter(iter){};
+ WindowFunctionExec(PartitionAccessor iter,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : _iter(iter), _memTracker(memTracker){};
PartitionAccessor _iter;
+ MemoryUsageTracker::PerFunctionMemoryTracker* _memTracker;
};
/**
@@ -97,45 +96,43 @@ public:
return _function->getValue();
}
- /**
- * Return the byte size of the values being stored by this class. Does not include the constant
- * size objects being held or the overhead of the data structures.
- */
- size_t getApproximateSize() const final {
- return _function->getApproximateSize() + _memUsageBytes;
- }
-
protected:
WindowFunctionExecRemovable(PartitionIterator* iter,
PartitionAccessor::Policy policy,
boost::intrusive_ptr<Expression> input,
- std::unique_ptr<WindowFunctionState> function)
- : WindowFunctionExec(PartitionAccessor(iter, policy)),
+ std::unique_ptr<WindowFunctionState> function,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExec(PartitionAccessor(iter, policy), memTracker),
_input(std::move(input)),
_function(std::move(function)) {}
+ void reset() {
+ _function->reset();
+ }
+
void addValue(Value v) {
+ long long prior = _function->getApproximateSize();
+ long long valueSize = v.getApproximateSize();
_function->add(v);
_values.push(v);
- _memUsageBytes += v.getApproximateSize();
+ _memTracker->update(valueSize + static_cast<long long>(_function->getApproximateSize()) -
+ prior);
}
void removeValue() {
tassert(5429400, "Tried to remove more values than we added", !_values.empty());
auto v = _values.front();
+ long long prior = _function->getApproximateSize();
_function->remove(v);
+ _memTracker->update(static_cast<long long>(_function->getApproximateSize()) - prior -
+ v.getApproximateSize());
_values.pop();
- _memUsageBytes -= v.getApproximateSize();
}
boost::intrusive_ptr<Expression> _input;
- std::unique_ptr<WindowFunctionState> _function;
// Keep track of values in the window function that will need to be removed later.
std::queue<Value> _values;
- // Track the byte size of the values being stored by this class.
- size_t _memUsageBytes = 0;
-
private:
/**
* This method notifies the executor that the underlying PartitionIterator
@@ -144,6 +141,8 @@ private:
* entered it? which have left it?) and call addValue(), removeValue() as needed.
*/
virtual void update() = 0;
+
+ std::unique_ptr<WindowFunctionState> _function;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.cpp
index 217e2c51943..1adfeb81dad 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.cpp
@@ -31,15 +31,6 @@
namespace mongo {
-namespace {
-// Convert expected error codes to BSONNULL, but uassert other unexpected codes.
-Value orNull(StatusWith<Value> val) {
- if (val.getStatus().code() == ErrorCodes::BadValue)
- return Value(BSONNULL);
- return uassertStatusOK(val);
-}
-} // namespace
-
Value WindowFunctionExecDerivative::getNext() {
auto endpoints = _iter.getEndpoints(_bounds);
if (!endpoints)
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h
index 1bac1417b99..ed13364a18c 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h
@@ -56,8 +56,10 @@ public:
boost::intrusive_ptr<Expression> position,
boost::intrusive_ptr<Expression> time,
WindowBounds bounds,
- boost::optional<TimeUnit> outputUnit)
- : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints)),
+ boost::optional<TimeUnit> outputUnit,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints),
+ memTracker),
_position(std::move(position)),
_time(std::move(time)),
_bounds(std::move(bounds)),
@@ -73,11 +75,6 @@ public:
Value getNext() final;
void reset() final {}
- // This executor does not store any documents as it processes.
- size_t getApproximateSize() const final {
- return 0;
- }
-
private:
boost::intrusive_ptr<Expression> _position;
boost::intrusive_ptr<Expression> _time;
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp
index 5ecb6080232..8968e474395 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp
@@ -52,11 +52,8 @@ public:
WindowBounds bounds,
boost::optional<TimeUnit> timeUnit = boost::none) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- _docSource.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), &_tracker, boost::none, boost::none);
auto position = ExpressionFieldPath::parse(
getExpCtx().get(), positionPath, getExpCtx()->variablesParseState);
@@ -67,7 +64,8 @@ public:
std::move(position),
std::move(time),
std::move(bounds),
- std::move(timeUnit));
+ std::move(timeUnit),
+ &_tracker["output"]);
}
auto advanceIterator() {
@@ -90,6 +88,7 @@ public:
private:
boost::intrusive_ptr<DocumentSourceMock> _docSource;
+ MemoryUsageTracker _tracker{false, 100 * 1024 * 1024 /* default memory limit */};
std::unique_ptr<PartitionIterator> _iter;
};
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h b/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h
index 775e4aae341..f64b77c28f8 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h
@@ -37,18 +37,14 @@
namespace mongo {
class WindowFunctionExecForEndpoint : public WindowFunctionExec {
-public:
- // Endpoint executors are constant size and don't hold any of the values passing through.
- size_t getApproximateSize() const final {
- return 0;
- }
-
protected:
WindowFunctionExecForEndpoint(PartitionIterator* iter,
boost::intrusive_ptr<Expression> input,
WindowBounds bounds,
- const boost::optional<Value>& defaultValue = boost::none)
- : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints)),
+ const boost::optional<Value>& defaultValue,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints),
+ memTracker),
_input(std::move(input)),
_bounds(std::move(bounds)),
_default(defaultValue.get_value_or(Value(BSONNULL))) {}
@@ -82,8 +78,10 @@ public:
WindowFunctionExecFirst(PartitionIterator* iter,
boost::intrusive_ptr<Expression> input,
WindowBounds bounds,
- const boost::optional<Value>& defaultValue = boost::none)
- : WindowFunctionExecForEndpoint(iter, std::move(input), std::move(bounds), defaultValue) {}
+ const boost::optional<Value>& defaultValue,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExecForEndpoint(
+ iter, std::move(input), std::move(bounds), defaultValue, memTracker) {}
Value getNext() {
return getFirst();
@@ -94,8 +92,10 @@ class WindowFunctionExecLast final : public WindowFunctionExecForEndpoint {
public:
WindowFunctionExecLast(PartitionIterator* iter,
boost::intrusive_ptr<Expression> input,
- WindowBounds bounds)
- : WindowFunctionExecForEndpoint(iter, std::move(input), std::move(bounds)) {}
+ WindowBounds bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExecForEndpoint(
+ iter, std::move(input), std::move(bounds), boost::none, memTracker) {}
Value getNext() {
return getLast();
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp
index 07e555c07c6..0650321c464 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp
@@ -58,15 +58,13 @@ public:
auto vps = expCtx->variablesParseState;
auto optKey =
keyPath ? optExp(ExpressionFieldPath::parse(expCtx, *keyPath, vps)) : boost::none;
- _iter = std::make_unique<PartitionIterator>(expCtx,
- _docSource.get(),
- optKey,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ _iter = std::make_unique<PartitionIterator>(
+ expCtx, _docSource.get(), &_tracker, optKey, boost::none);
auto inputField = ExpressionFieldPath::parse(expCtx, "$val", vps);
- return {WindowFunctionExecFirst(_iter.get(), inputField, bounds, defaultVal),
- WindowFunctionExecLast(_iter.get(), inputField, bounds)};
+ return {WindowFunctionExecFirst(
+ _iter.get(), inputField, bounds, defaultVal, &_tracker["first"]),
+ WindowFunctionExecLast(_iter.get(), inputField, bounds, &_tracker["last"])};
}
auto advanceIterator() {
@@ -75,6 +73,7 @@ public:
private:
boost::intrusive_ptr<DocumentSourceMock> _docSource;
+ MemoryUsageTracker _tracker{false, 100 * 1024 * 1024 /* default memory limit */};
std::unique_ptr<PartitionIterator> _iter;
};
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h
index 5fda7ceedfd..6d858d7646f 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h
@@ -39,12 +39,10 @@ namespace mongo {
/**
* An executor that specifically handles document-based window types which accumulate values with no
- * need to remove old ones. The 'WindowFunc' parameter must expose a 'process()' and corresponding
- * 'getValue()' method to get the accumulation result.
+ * need to remove old ones.
*
* Only the upper bound is needed as the lower bound is always considered to be unbounded.
*/
-template <class WindowFunc>
class WindowFunctionExecNonRemovable final : public WindowFunctionExec {
public:
/**
@@ -57,10 +55,11 @@ public:
*/
WindowFunctionExecNonRemovable(PartitionIterator* iter,
boost::intrusive_ptr<Expression> input,
- boost::intrusive_ptr<WindowFunc> function,
- WindowBounds::Bound<int> upperDocumentBound)
- : WindowFunctionExec(
- PartitionAccessor(iter, PartitionAccessor::Policy::kDefaultSequential)),
+ boost::intrusive_ptr<AccumulatorState> function,
+ WindowBounds::Bound<int> upperDocumentBound,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kDefaultSequential),
+ memTracker),
_input(std::move(input)),
_function(std::move(function)),
_upperDocumentBound(upperDocumentBound){};
@@ -81,6 +80,7 @@ public:
if (auto doc = (this->_iter)[upperIndex]) {
_function->process(
_input->evaluate(*doc, &_input->getExpressionContext()->variables), false);
+ _memTracker->set(_function->getMemUsage());
} else {
// Upper bound is out of range, but may be because it's off of the end of the
// partition. For instance, for bounds [unbounded, -1] we won't be able to
@@ -91,18 +91,15 @@ public:
return _function->getValue(false);
}
- size_t getApproximateSize() const final {
- return _function->getMemUsage();
- }
-
void reset() final {
_initialized = false;
_function->reset();
+ _memTracker->set(0);
}
private:
boost::intrusive_ptr<Expression> _input;
- boost::intrusive_ptr<WindowFunc> _function;
+ boost::intrusive_ptr<AccumulatorState> _function;
WindowBounds::Bound<int> _upperDocumentBound;
// In one of two states: either the initial window has not been populated or we are sliding and
@@ -125,6 +122,7 @@ private:
if (auto doc = (this->_iter)[i]) {
_function->process(
_input->evaluate(*doc, &_input->getExpressionContext()->variables), false);
+ _memTracker->set(_function->getMemUsage());
} else {
// Already reached the end of partition for the first value to compute.
break;
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h
index 104e5af4d0b..2171247e0c6 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h
@@ -45,8 +45,10 @@ public:
boost::intrusive_ptr<Expression> input,
boost::intrusive_ptr<ExpressionFieldPath> sortExpr,
boost::intrusive_ptr<AccumulatorState> function,
- WindowBounds bounds)
- : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kRightEndpoint)),
+ WindowBounds bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kRightEndpoint),
+ memTracker),
_input(std::move(input)),
_sortExpr(std::move(sortExpr)),
_function(std::move(function)),
@@ -57,10 +59,6 @@ public:
return _function->getValue(false);
}
- size_t getApproximateSize() const final {
- return _function->getMemUsage();
- }
-
void reset() final {
_function->reset();
_lastEndpoints = boost::none;
@@ -80,6 +78,7 @@ private:
} else {
// Transition from nonempty to empty: discard the accumulator state.
_function->reset();
+ _memTracker->set(_function->getMemUsage());
}
} else {
if (endpoints) {
@@ -105,6 +104,7 @@ private:
tassert(5429411, "endpoints must fall in the partition", doc);
Value v = _input->evaluate(*doc, &_input->getExpressionContext()->variables);
_function->process(v, false);
+ _memTracker->set(_function->getMemUsage());
}
boost::intrusive_ptr<Expression> _input;
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
index abc679539f5..960d21877cf 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
@@ -51,32 +51,33 @@ namespace {
class WindowFunctionExecNonRemovableTest : public AggregationContextFixture {
public:
template <class AccumulatorType>
- WindowFunctionExecNonRemovable<AccumulatorState> createForFieldPath(
+ WindowFunctionExecNonRemovable createForFieldPath(
std::deque<DocumentSource::GetNextResult> docs,
const std::string& inputPath,
WindowBounds::Bound<int> upper,
boost::optional<std::string> sortByPath = boost::none) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- _docSource.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), &_tracker, boost::none, boost::none);
auto input = ExpressionFieldPath::parse(
getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
if (sortByPath) {
auto sortBy = ExpressionFieldPath::parse(
getExpCtx().get(), *sortByPath, getExpCtx()->variablesParseState);
- return WindowFunctionExecNonRemovable<AccumulatorState>(
+ return WindowFunctionExecNonRemovable(
_iter.get(),
ExpressionArray::create(
getExpCtx().get(),
std::vector<boost::intrusive_ptr<Expression>>{sortBy, input}),
AccumulatorType::create(getExpCtx().get()),
- upper);
+ upper,
+ &_tracker["output"]);
} else {
- return WindowFunctionExecNonRemovable<AccumulatorState>(
- _iter.get(), std::move(input), AccumulatorType::create(getExpCtx().get()), upper);
+ return WindowFunctionExecNonRemovable(_iter.get(),
+ std::move(input),
+ AccumulatorType::create(getExpCtx().get()),
+ upper,
+ &_tracker["output"]);
}
}
@@ -84,6 +85,8 @@ public:
return _iter->advance();
}
+ MemoryUsageTracker _tracker{false, 100 * 1024 * 1024 /* default memory limit */};
+
private:
boost::intrusive_ptr<DocumentSourceMock> _docSource;
std::unique_ptr<PartitionIterator> _iter;
@@ -142,13 +145,13 @@ TEST_F(WindowFunctionExecNonRemovableTest, AccumulateOnlyWithMultiplePartitions)
getExpCtx().get(), "key", getExpCtx()->variablesParseState);
auto iter = PartitionIterator(getExpCtx().get(),
mock.get(),
+ &_tracker,
boost::optional<boost::intrusive_ptr<Expression>>(key),
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ boost::none);
auto input =
ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
- auto mgr = WindowFunctionExecNonRemovable<AccumulatorState>(
- &iter, std::move(input), AccumulatorSum::create(getExpCtx().get()), 1);
+ auto mgr = WindowFunctionExecNonRemovable(
+ &iter, std::move(input), AccumulatorSum::create(getExpCtx().get()), 1, &_tracker["output"]);
ASSERT_VALUE_EQ(Value(1), mgr.getNext());
iter.advance();
// Normally the stage would be responsible for detecting a new partition, for this test reset
@@ -175,17 +178,17 @@ TEST_F(WindowFunctionExecNonRemovableTest, InputExpressionAllowedToCreateVariabl
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
auto docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- auto iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- docSource.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ auto iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), docSource.get(), &_tracker, boost::none, boost::none);
auto filterBSON =
fromjson("{$filter: {input: [1, 2, 3], as: 'num', cond: {$gte: ['$$num', 2]}}}");
auto input = ExpressionFilter::parse(
getExpCtx().get(), filterBSON.firstElement(), getExpCtx()->variablesParseState);
- auto exec = WindowFunctionExecNonRemovable<AccumulatorState>(
- iter.get(), std::move(input), AccumulatorFirst::create(getExpCtx().get()), 1);
+ auto exec = WindowFunctionExecNonRemovable(iter.get(),
+ std::move(input),
+ AccumulatorFirst::create(getExpCtx().get()),
+ 1,
+ &_tracker["output"]);
// The input is a constant [2, 3] for each document.
ASSERT_VALUE_EQ(Value(std::vector<Value>{Value(2), Value(3)}), exec.getNext());
iter->advance();
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
index 74cd8fd584e..ef053afdc30 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
@@ -37,11 +37,14 @@ WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument(
PartitionIterator* iter,
boost::intrusive_ptr<Expression> input,
std::unique_ptr<WindowFunctionState> function,
- WindowBounds::DocumentBased bounds)
+ WindowBounds::DocumentBased bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
: WindowFunctionExecRemovable(iter,
PartitionAccessor::Policy::kDefaultSequential,
std::move(input),
- std::move(function)) {
+ std::move(function),
+
+ memTracker) {
stdx::visit(
visit_helper::Overloaded{
[](const WindowBounds::Unbounded&) {
@@ -63,7 +66,7 @@ WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument(
[&](const int& upperIndex) { _upperBound = upperIndex; },
},
bounds.upper);
- _memUsageBytes = sizeof(*this);
+ _memTracker->set(sizeof(*this));
}
void WindowFunctionExecRemovableDocument::initialize() {
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
index 2519f8f22a8..d60f6f586b4 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
@@ -52,13 +52,14 @@ public:
WindowFunctionExecRemovableDocument(PartitionIterator* iter,
boost::intrusive_ptr<Expression> input,
std::unique_ptr<WindowFunctionState> function,
- WindowBounds::DocumentBased bounds);
+ WindowBounds::DocumentBased bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker);
void reset() final {
- _function->reset();
_values = std::queue<Value>();
_initialized = false;
- _memUsageBytes = sizeof(*this);
+ _memTracker->set(sizeof(*this));
+ WindowFunctionExecRemovable::reset();
}
private:
@@ -69,9 +70,7 @@ private:
if (_values.size() == 0) {
return;
}
- _memUsageBytes -= _values.front().getApproximateSize();
- _function->remove(_values.front());
- _values.pop();
+ removeValue();
}
// In one of two states: either the initial window has not been populated or we are sliding and
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp
index 5783e2da1ec..746baeb44fb 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp
@@ -41,9 +41,13 @@ WindowFunctionExecRemovableRange::WindowFunctionExecRemovableRange(
boost::intrusive_ptr<Expression> input,
boost::intrusive_ptr<ExpressionFieldPath> sortBy,
std::unique_ptr<WindowFunctionState> function,
- WindowBounds bounds)
- : WindowFunctionExecRemovable(
- iter, PartitionAccessor::Policy::kEndpoints, std::move(input), std::move(function)),
+ WindowBounds bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker)
+ : WindowFunctionExecRemovable(iter,
+ PartitionAccessor::Policy::kEndpoints,
+ std::move(input),
+ std::move(function),
+ memTracker),
_sortBy(std::move(sortBy)),
_bounds(std::move(bounds)) {}
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h
index b58416fac7f..29ff3fe262c 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h
@@ -65,17 +65,13 @@ public:
boost::intrusive_ptr<Expression> input,
boost::intrusive_ptr<ExpressionFieldPath> sortBy,
std::unique_ptr<WindowFunctionState> function,
- WindowBounds bounds);
-
- Value getNext() override {
- update();
- return _function->getValue();
- }
+ WindowBounds bounds,
+ MemoryUsageTracker::PerFunctionMemoryTracker* memTracker);
void reset() final {
- _function->reset();
_lastEndpoints = boost::none;
- _memUsageBytes = 0;
+ _memTracker->set(0);
+ WindowFunctionExecRemovable::reset();
}
private:
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
index 554eb1469f3..b681dea2925 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
@@ -55,17 +55,14 @@ public:
const std::string& inputPath,
WindowBounds::DocumentBased bounds) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- _docSource.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), &_tracker, boost::none, boost::none);
auto input = ExpressionFieldPath::parse(
getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
std::unique_ptr<WindowFunctionState> maxFunc =
std::make_unique<WindowFunctionMax>(getExpCtx().get());
return WindowFunctionExecRemovableDocument(
- _iter.get(), std::move(input), std::move(maxFunc), bounds);
+ _iter.get(), std::move(input), std::move(maxFunc), bounds, &_tracker["output"]);
}
WindowFunctionExecRemovableDocument createForFieldPath(
@@ -74,11 +71,8 @@ public:
const std::string& sortByPath,
WindowBounds::DocumentBased bounds) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- _docSource.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), &_tracker, boost::none, boost::none);
auto input = ExpressionFieldPath::parse(
getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
auto sortBy = ExpressionFieldPath::parse(
@@ -90,13 +84,16 @@ public:
ExpressionArray::create(getExpCtx().get(),
std::vector<boost::intrusive_ptr<Expression>>{sortBy, input}),
std::move(integralFunc),
- bounds);
+ bounds,
+ &_tracker["output"]);
}
auto advanceIterator() {
return _iter->advance();
}
+ MemoryUsageTracker _tracker{false, 100 * 1024 * 1024 /* default memory limit */};
+
private:
boost::intrusive_ptr<DocumentSourceMock> _docSource;
std::unique_ptr<PartitionIterator> _iter;
@@ -285,18 +282,24 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) {
auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
auto key = ExpressionFieldPath::createPathFromString(
getExpCtx().get(), "key", getExpCtx()->variablesParseState);
- auto iter = PartitionIterator{getExpCtx().get(),
- mock.get(),
- boost::optional<boost::intrusive_ptr<Expression>>(key),
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */};
+ MemoryUsageTracker tracker{false, 100 * 1024 * 1024 /* default memory limit */};
+ auto iter = PartitionIterator{
+ getExpCtx().get(),
+ mock.get(),
+ &tracker,
+ boost::optional<boost::intrusive_ptr<Expression>>(key),
+ boost::none,
+ };
auto input =
ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString;
std::unique_ptr<WindowFunctionState> maxFunc =
std::make_unique<WindowFunctionMax>(getExpCtx().get());
- auto mgr = WindowFunctionExecRemovableDocument(
- &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{0, 0});
+ auto mgr = WindowFunctionExecRemovableDocument(&iter,
+ std::move(input),
+ std::move(maxFunc),
+ WindowBounds::DocumentBased{0, 0},
+ &_tracker["output"]);
ASSERT_VALUE_EQ(Value(3), mgr.getNext());
iter.advance();
ASSERT_VALUE_EQ(Value(2), mgr.getNext());
@@ -319,16 +322,23 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) {
auto mockTwo = DocumentSourceMock::createForTest(std::move(docsTwo), getExpCtx());
auto keyTwo = ExpressionFieldPath::createPathFromString(
getExpCtx().get(), "key", getExpCtx()->variablesParseState);
- auto iter = PartitionIterator{getExpCtx().get(),
- mockTwo.get(),
- boost::optional<boost::intrusive_ptr<Expression>>(keyTwo),
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */};
+ MemoryUsageTracker tracker{false, 100 * 1024 * 1024 /* default memory limit */};
+
+ auto iter = PartitionIterator{
+ getExpCtx().get(),
+ mockTwo.get(),
+ &tracker,
+ boost::optional<boost::intrusive_ptr<Expression>>(keyTwo),
+ boost::none,
+ };
auto input =
ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
auto maxFunc = std::make_unique<WindowFunctionMax>(getExpCtx().get());
- auto mgr = WindowFunctionExecRemovableDocument(
- &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{-1, 0});
+ auto mgr = WindowFunctionExecRemovableDocument(&iter,
+ std::move(input),
+ std::move(maxFunc),
+ WindowBounds::DocumentBased{-1, 0},
+ &_tracker["output"]);
ASSERT_VALUE_EQ(Value(3), mgr.getNext());
iter.advance();
ASSERT_VALUE_EQ(Value(3), mgr.getNext());
@@ -344,18 +354,18 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, InputExpressionAllowedToCreateVa
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
auto docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- auto iter = std::make_unique<PartitionIterator>(getExpCtx().get(),
- docSource.get(),
- boost::none,
- boost::none,
- 100 * 1024 * 1024 /* default memory limit */);
+ auto iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), docSource.get(), &_tracker, boost::none, boost::none);
auto filterBSON =
fromjson("{$filter: {input: [1, 2, 3], as: 'num', cond: {$gte: ['$$num', 2]}}}");
auto input = ExpressionFilter::parse(
getExpCtx().get(), filterBSON.firstElement(), getExpCtx()->variablesParseState);
auto maxFunc = std::make_unique<WindowFunctionMax>(getExpCtx().get());
- auto mgr = WindowFunctionExecRemovableDocument(
- iter.get(), std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{-1, 0});
+ auto mgr = WindowFunctionExecRemovableDocument(iter.get(),
+ std::move(input),
+ std::move(maxFunc),
+ WindowBounds::DocumentBased{-1, 0},
+ &_tracker["output"]);
// The input is a constant [2, 3] for each document.
ASSERT_VALUE_EQ(Value(std::vector<Value>{Value(2), Value(3)}), mgr.getNext());
iter->advance();