diff options
author | Eric Cox <eric.cox@mongodb.com> | 2022-06-13 17:29:05 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-16 19:41:29 +0000 |
commit | 342552401af6b49d37bdc2a507c5524642bb4e95 (patch) | |
tree | 8c0ae8a235c20c7ec7b78bd1056bd82903fa2a04 | |
parent | 1bf1161e5db69fcee1aaa135b18ae6d9e10e94e5 (diff) | |
download | mongo-342552401af6b49d37bdc2a507c5524642bb4e95.tar.gz |
SERVER-66989 Make HashAgg record store key owned
(cherry picked from commit e399422548621ca737903284e8b435c804bb8254)
-rw-r--r-- | jstests/noPassthrough/group_tmp_file_cleanup.js | 46 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.h | 5 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/util/spilling.cpp | 3 |
4 files changed, 54 insertions, 4 deletions
diff --git a/jstests/noPassthrough/group_tmp_file_cleanup.js b/jstests/noPassthrough/group_tmp_file_cleanup.js new file mode 100644 index 00000000000..4482b5d61fe --- /dev/null +++ b/jstests/noPassthrough/group_tmp_file_cleanup.js @@ -0,0 +1,46 @@ +/** + * Test that $group cleans up temporary files under dbpath + '/_tmp'. + */ + +(function() { +"use strict"; + +const memoryLimitMb = 1; +const memoryLimitBytes = memoryLimitMb * 1024 * 1024; + +// Start mongod with reduced memory limit for the $group stage. +const conn = MongoRunner.runMongod({ + setParameter: { + internalDocumentSourceGroupMaxMemoryBytes: memoryLimitBytes, + internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: memoryLimitBytes + } +}); +const testDb = conn.getDB(jsTestName()); + +// Create a collection exceeding the memory limit. +testDb.largeColl.drop(); +const largeStr = "A".repeat(1024 * 1024); // 1MB string +for (let i = 0; i < memoryLimitMb + 1; ++i) + assert.commandWorked(testDb.largeColl.insert({x: i, largeStr: largeStr + i})); + +// Inhibit optimization so that $group runs in the classic engine. +let pipeline = + [{$_internalInhibitOptimization: {}}, {$group: {_id: '$largeStr', minId: {$min: '$_id'}}}]; + +// Make sure that the pipeline needs to spill to disk. +assert.throwsWithCode(() => testDb.largeColl.aggregate(pipeline, {allowDiskUse: false}), + ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed); + +testDb.largeColl.aggregate(pipeline); +assert.eq(listFiles(conn.dbpath + "/_tmp").length, 0); + +// Run the pipeline without $_internalInhibitOptimization so that $group runs in the sbe engine. +pipeline = [{$group: {_id: '$largeStr', minId: {$min: '$_id'}}}]; + +// Make sure that the pipeline needs to spill to disk. +assert.throwsWithCode(() => testDb.largeColl.aggregate(pipeline, {allowDiskUse: false}), + ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed); +testDb.largeColl.aggregate(pipeline); + +MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index 73a2375d417..bdb1f7388a6 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -501,9 +501,9 @@ PlanState HashAggStage::getNext() { KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader); _aggValueRecordStore = val; - BufBuilder buf; + _aggKeyRSBuffer.reset(); _aggKeyRecordStore = value::MaterializedRow::deserializeFromKeyString( - decodeKeyString(nextRecord->id, typeBits), &buf); + decodeKeyString(nextRecord->id, typeBits), &_aggKeyRSBuffer); return trackPlanState(PlanState::ADVANCED); } else { _rsCursor.reset(); diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h index 8c117e8717d..19fbca9d1c7 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.h +++ b/src/mongo/db/exec/sbe/stages/hash_agg.h @@ -184,6 +184,11 @@ private: std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors; std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreAggAccessors; + // This buffer stores values for the spilled '_aggKeyRecordStore' that's loaded into memory from + // the '_recordStore'. Values in the '_aggKeyRecordStore' row are pointers that point to data in + // this buffer. + BufBuilder _aggKeyRSBuffer; + std::vector<value::SlotAccessor*> _seekKeysAccessors; value::MaterializedRow _seekKeys; diff --git a/src/mongo/db/exec/sbe/util/spilling.cpp b/src/mongo/db/exec/sbe/util/spilling.cpp index 45931efec8b..c54f3bfe956 100644 --- a/src/mongo/db/exec/sbe/util/spilling.cpp +++ b/src/mongo/db/exec/sbe/util/spilling.cpp @@ -63,8 +63,7 @@ boost::optional<value::MaterializedRow> readFromRecordStore(OperationContext* op RecordData record; if (rs->findRecord(opCtx, rid, &record)) { auto valueReader = BufReader(record.data(), record.size()); - auto val = value::MaterializedRow::deserializeForSorter(valueReader, {}); - return val; + return value::MaterializedRow::deserializeForSorter(valueReader, {}); } return boost::none; } |