summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/sbe/stages/hash_agg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/sbe/stages/hash_agg.cpp')
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp39
1 files changed, 17 insertions, 22 deletions
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index e3fd62cb86b..f930d4b5e95 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -47,8 +47,9 @@ HashAggStage::HashAggStage(std::unique_ptr<PlanStage> input,
bool optimizedClose,
boost::optional<value::SlotId> collatorSlot,
bool allowDiskUse,
- PlanNodeId planNodeId)
- : PlanStage("group"_sd, planNodeId),
+ PlanNodeId planNodeId,
+ bool participateInTrialRunTracking)
+ : PlanStage("group"_sd, planNodeId, participateInTrialRunTracking),
_gbs(std::move(gbs)),
_aggs(std::move(aggs)),
_collatorSlot(collatorSlot),
@@ -74,7 +75,8 @@ std::unique_ptr<PlanStage> HashAggStage::clone() const {
_optimizedClose,
_collatorSlot,
_allowDiskUse,
- _commonStats.nodeId);
+ _commonStats.nodeId,
+ _participateInTrialRunTracking);
}
void HashAggStage::doSaveState(bool relinquishCursor) {
@@ -354,25 +356,18 @@ void HashAggStage::open(bool reOpen) {
key.reset(idx++, false, tag, val);
}
- if (!_recordStore) {
- // The memory limit hasn't been reached yet, accumulate state in '_ht'.
- auto [it, inserted] = _ht->try_emplace(std::move(key), value::MaterializedRow{0});
- if (inserted) {
- // Copy keys.
- const_cast<value::MaterializedRow&>(it->first).makeOwned();
- // Initialize accumulators.
- it->second.resize(_outAggAccessors.size());
- }
- // Always update the state in the '_ht' for the branch when data hasn't been
- // spilled to disk.
+
+ if (_htIt = _ht->find(key); !_recordStore && _htIt == _ht->end()) {
+ // The memory limit hasn't been reached yet, insert a new key in '_ht' by copying
+ // the key. Note as a future optimization, we should avoid the lookup in the find()
+ // call and the emplace.
+ key.makeOwned();
+ auto [it, _] = _ht->emplace(std::move(key), value::MaterializedRow{0});
+ // Initialize accumulators.
+ it->second.resize(_outAggAccessors.size());
_htIt = it;
- updateAggStateHt = true;
- } else {
- // The memory limit has been reached, accumulate state in '_ht' only if we
- // find the key in '_ht'.
- _htIt = _ht->find(key);
- updateAggStateHt = _htIt != _ht->end();
}
+ updateAggStateHt = _htIt != _ht->end();
if (updateAggStateHt) {
// Accumulate state in '_ht' by pointing the '_outAggAccessors' the
@@ -500,9 +495,9 @@ PlanState HashAggStage::getNext() {
KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader);
_aggValueRecordStore = val;
- BufBuilder buf;
+ _aggKeyRSBuffer.reset();
_aggKeyRecordStore = value::MaterializedRow::deserializeFromKeyString(
- decodeKeyString(nextRecord->id, typeBits), &buf);
+ decodeKeyString(nextRecord->id, typeBits), &_aggKeyRSBuffer);
return trackPlanState(PlanState::ADVANCED);
} else {
_rsCursor.reset();