summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2022-06-13 17:29:05 +0000
committerEric Cox <eric.cox@mongodb.com>2022-06-16 18:57:48 +0000
commitacaf760974171260d12d976124e412c36f6ea5d6 (patch)
tree8c0ae8a235c20c7ec7b78bd1056bd82903fa2a04
parent1bf1161e5db69fcee1aaa135b18ae6d9e10e94e5 (diff)
downloadmongo-acaf760974171260d12d976124e412c36f6ea5d6.tar.gz
SERVER-66989 Make HashAgg record store key owned
(cherry picked from commit e399422548621ca737903284e8b435c804bb8254)
-rw-r--r--jstests/noPassthrough/group_tmp_file_cleanup.js46
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp4
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.h5
-rw-r--r--src/mongo/db/exec/sbe/util/spilling.cpp3
4 files changed, 54 insertions, 4 deletions
diff --git a/jstests/noPassthrough/group_tmp_file_cleanup.js b/jstests/noPassthrough/group_tmp_file_cleanup.js
new file mode 100644
index 00000000000..4482b5d61fe
--- /dev/null
+++ b/jstests/noPassthrough/group_tmp_file_cleanup.js
@@ -0,0 +1,46 @@
+/**
+ * Test that $group cleans up temporary files under dbpath + '/_tmp'.
+ */
+
+(function() {
+"use strict";
+
+const memoryLimitMb = 1;
+const memoryLimitBytes = memoryLimitMb * 1024 * 1024;
+
+// Start mongod with reduced memory limit for the $group stage.
+const conn = MongoRunner.runMongod({
+ setParameter: {
+ internalDocumentSourceGroupMaxMemoryBytes: memoryLimitBytes,
+ internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: memoryLimitBytes
+ }
+});
+const testDb = conn.getDB(jsTestName());
+
+// Create a collection exceeding the memory limit.
+testDb.largeColl.drop();
+const largeStr = "A".repeat(1024 * 1024); // 1MB string
+for (let i = 0; i < memoryLimitMb + 1; ++i)
+ assert.commandWorked(testDb.largeColl.insert({x: i, largeStr: largeStr + i}));
+
+// Inhibit optimization so that $group runs in the classic engine.
+let pipeline =
+ [{$_internalInhibitOptimization: {}}, {$group: {_id: '$largeStr', minId: {$min: '$_id'}}}];
+
+// Make sure that the pipeline needs to spill to disk.
+assert.throwsWithCode(() => testDb.largeColl.aggregate(pipeline, {allowDiskUse: false}),
+ ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed);
+
+testDb.largeColl.aggregate(pipeline);
+assert.eq(listFiles(conn.dbpath + "/_tmp").length, 0);
+
+// Run the pipeline without $_internalInhibitOptimization so that $group runs in the sbe engine.
+pipeline = [{$group: {_id: '$largeStr', minId: {$min: '$_id'}}}];
+
+// Make sure that the pipeline needs to spill to disk.
+assert.throwsWithCode(() => testDb.largeColl.aggregate(pipeline, {allowDiskUse: false}),
+ ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed);
+testDb.largeColl.aggregate(pipeline);
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index 73a2375d417..bdb1f7388a6 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -501,9 +501,9 @@ PlanState HashAggStage::getNext() {
KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader);
_aggValueRecordStore = val;
- BufBuilder buf;
+ _aggKeyRSBuffer.reset();
_aggKeyRecordStore = value::MaterializedRow::deserializeFromKeyString(
- decodeKeyString(nextRecord->id, typeBits), &buf);
+ decodeKeyString(nextRecord->id, typeBits), &_aggKeyRSBuffer);
return trackPlanState(PlanState::ADVANCED);
} else {
_rsCursor.reset();
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h
index 8c117e8717d..19fbca9d1c7 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.h
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.h
@@ -184,6 +184,11 @@ private:
std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors;
std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreAggAccessors;
+ // This buffer stores values for the spilled '_aggKeyRecordStore' that's loaded into memory from
+ // the '_recordStore'. Values in the '_aggKeyRecordStore' row are pointers that point to data in
+ // this buffer.
+ BufBuilder _aggKeyRSBuffer;
+
std::vector<value::SlotAccessor*> _seekKeysAccessors;
value::MaterializedRow _seekKeys;
diff --git a/src/mongo/db/exec/sbe/util/spilling.cpp b/src/mongo/db/exec/sbe/util/spilling.cpp
index 45931efec8b..c54f3bfe956 100644
--- a/src/mongo/db/exec/sbe/util/spilling.cpp
+++ b/src/mongo/db/exec/sbe/util/spilling.cpp
@@ -63,8 +63,7 @@ boost::optional<value::MaterializedRow> readFromRecordStore(OperationContext* op
RecordData record;
if (rs->findRecord(opCtx, rid, &record)) {
auto valueReader = BufReader(record.data(), record.size());
- auto val = value::MaterializedRow::deserializeForSorter(valueReader, {});
- return val;
+ return value::MaterializedRow::deserializeForSorter(valueReader, {});
}
return boost::none;
}