diff options
author | Martin Neupauer <xmaton@messengeruser.com> | 2020-07-28 11:18:57 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-07 23:21:49 +0000 |
commit | 52060dc0df47fb415b0560ecfb65a29cdc20b7ac (patch) | |
tree | 3421e1881b5f39f85af2b6db1e0119b66f413f1e /src/mongo/db/exec | |
parent | ca1d644a6f31477b247fa79b6345528aba165281 (diff) | |
download | mongo-52060dc0df47fb415b0560ecfb65a29cdc20b7ac.tar.gz |
SERVER-49829 - Implement spilling for Top K sort in SBE.
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_join.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/sort.cpp | 175 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/sort.h | 16 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/spool.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/values/slot.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/values/slot.h | 186 |
7 files changed, 250 insertions, 187 deletions
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index aa5c0deb58e..8dde59a76a4 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -104,21 +104,20 @@ void HashAggStage::open(bool reOpen) { _children[0]->open(reOpen); while (_children[0]->getNext() == PlanState::ADVANCED) { - value::MaterializedRow key; - key._fields.resize(_inKeyAccessors.size()); + value::MaterializedRow key{_inKeyAccessors.size()}; // Copy keys in order to do the lookup. size_t idx = 0; for (auto& p : _inKeyAccessors) { auto [tag, val] = p->getViewOfValue(); - key._fields[idx++].reset(false, tag, val); + key.reset(idx++, false, tag, val); } - auto [it, inserted] = _ht.emplace(std::move(key), value::MaterializedRow{}); + auto [it, inserted] = _ht.try_emplace(std::move(key), value::MaterializedRow{0}); if (inserted) { // Copy keys. const_cast<value::MaterializedRow&>(it->first).makeOwned(); // Initialize accumulators. - it->second._fields.resize(_outAggAccessors.size()); + it->second.resize(_outAggAccessors.size()); } // Accumulate. diff --git a/src/mongo/db/exec/sbe/stages/hash_join.cpp b/src/mongo/db/exec/sbe/stages/hash_join.cpp index 4f93c247376..d191d561b52 100644 --- a/src/mongo/db/exec/sbe/stages/hash_join.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_join.cpp @@ -46,7 +46,8 @@ HashJoinStage::HashJoinStage(std::unique_ptr<PlanStage> outer, _outerCond(std::move(outerCond)), _outerProjects(std::move(outerProjects)), _innerCond(std::move(innerCond)), - _innerProjects(std::move(innerProjects)) { + _innerProjects(std::move(innerProjects)), + _probeKey(0) { if (_outerCond.size() != _innerCond.size()) { uasserted(4822823, "left and right size do not match"); } @@ -98,7 +99,7 @@ void HashJoinStage::prepare(CompileCtx& ctx) { _outOuterAccessors[slot] = _outOuterProjectAccessors.back().get(); } - _probeKey._fields.resize(_inInnerKeyAccessors.size()); + _probeKey.resize(_inInnerKeyAccessors.size()); _compiled = true; } @@ -119,25 +120,22 @@ void HashJoinStage::open(bool reOpen) { _commonStats.opens++; _children[0]->open(reOpen); // Insert the outer side into the hash table. - while (_children[0]->getNext() == PlanState::ADVANCED) { - value::MaterializedRow key; - value::MaterializedRow project; - key._fields.reserve(_inOuterKeyAccessors.size()); - project._fields.reserve(_inOuterProjectAccessors.size()); + value::MaterializedRow key{_inOuterKeyAccessors.size()}; + value::MaterializedRow project{_inOuterProjectAccessors.size()}; + size_t idx = 0; // Copy keys in order to do the lookup. for (auto& p : _inOuterKeyAccessors) { - key._fields.push_back(value::OwnedValueAccessor{}); auto [tag, val] = p->copyOrMoveValue(); - key._fields.back().reset(true, tag, val); + key.reset(idx++, true, tag, val); } + idx = 0; // Copy projects. for (auto& p : _inOuterProjectAccessors) { - project._fields.push_back(value::OwnedValueAccessor{}); auto [tag, val] = p->copyOrMoveValue(); - project._fields.back().reset(true, tag, val); + project.reset(idx++, true, tag, val); } _ht.emplace(std::move(key), std::move(project)); @@ -168,7 +166,7 @@ PlanState HashJoinStage::getNext() { size_t idx = 0; for (auto& p : _inInnerKeyAccessors) { auto [tag, val] = p->getViewOfValue(); - _probeKey._fields[idx++].reset(false, tag, val); + _probeKey.reset(idx++, false, tag, val); } auto [low, hi] = _ht.equal_range(_probeKey); diff --git a/src/mongo/db/exec/sbe/stages/sort.cpp b/src/mongo/db/exec/sbe/stages/sort.cpp index 453e4a98f0a..7eec22e5079 100644 --- a/src/mongo/db/exec/sbe/stages/sort.cpp +++ b/src/mongo/db/exec/sbe/stages/sort.cpp @@ -32,7 +32,6 @@ #include "mongo/db/exec/sbe/stages/sort.h" #include "mongo/db/exec/sbe/expressions/expression.h" -#include "mongo/db/sorter/sorter.h" #include "mongo/util/str.h" namespace { @@ -42,6 +41,8 @@ std::string nextFileName() { } } // namespace +#include "mongo/db/sorter/sorter.cpp" + namespace mongo { namespace sbe { SortStage::SortStage(std::unique_ptr<PlanStage> input, @@ -59,7 +60,7 @@ SortStage::SortStage(std::unique_ptr<PlanStage> input, _limit(limit), _memoryLimit(memoryLimit), _allowDiskUse(allowDiskUse), - _st(value::MaterializedRowComparator{_dirs}), + _mergeData({0, 0}), _tracker(tracker) { _children.emplace_back(std::move(input)); @@ -76,100 +77,91 @@ std::unique_ptr<PlanStage> SortStage::clone() const { void SortStage::prepare(CompileCtx& ctx) { _children[0]->prepare(ctx); - value::SlotSet dupCheck; - size_t counter = 0; // Process order by fields. for (auto& slot : _obs) { - auto [it, inserted] = dupCheck.insert(slot); - uassert(4822812, str::stream() << "duplicate field: " << slot, inserted); - _inKeyAccessors.emplace_back(_children[0]->getAccessor(ctx, slot)); - std::vector<std::unique_ptr<value::SlotAccessor>> accessors; - accessors.emplace_back(std::make_unique<SortKeyAccessor>(_stIt, counter)); - accessors.emplace_back(std::make_unique<value::MaterializedRowKeyAccessor<SorterData*>>( - _mergeDataIt, counter)); - _outAccessors.emplace(slot, value::SwitchAccessor{std::move(accessors)}); + auto [it, inserted] = + _outAccessors.emplace(slot, + std::make_unique<value::MaterializedRowKeyAccessor<SorterData*>>( + _mergeDataIt, counter)); ++counter; + uassert(4822812, str::stream() << "duplicate field: " << slot, inserted); } counter = 0; // Process value fields. for (auto& slot : _vals) { - auto [it, inserted] = dupCheck.insert(slot); - uassert(4822813, str::stream() << "duplicate field: " << slot, inserted); - _inValueAccessors.emplace_back(_children[0]->getAccessor(ctx, slot)); - std::vector<std::unique_ptr<value::SlotAccessor>> accessors; - accessors.emplace_back(std::make_unique<SortValueAccessor>(_stIt, counter)); - accessors.emplace_back(std::make_unique<value::MaterializedRowValueAccessor<SorterData*>>( - _mergeDataIt, counter)); - _outAccessors.emplace(slot, value::SwitchAccessor{std::move(accessors)}); + auto [it, inserted] = _outAccessors.emplace( + slot, + std::make_unique<value::MaterializedRowValueAccessor<SorterData*>>(_mergeDataIt, + counter)); ++counter; + uassert(4822813, str::stream() << "duplicate field: " << slot, inserted); } } value::SlotAccessor* SortStage::getAccessor(CompileCtx& ctx, value::SlotId slot) { if (auto it = _outAccessors.find(slot); it != _outAccessors.end()) { - return &it->second; + return it->second.get(); } return ctx.getAccessor(slot); } -void SortStage::open(bool reOpen) { - _commonStats.opens++; - _children[0]->open(reOpen); - +void SortStage::makeSorter() { SortOptions opts; opts.tempDir = storageGlobalParams.dbpath + "/_tmp"; - std::string spillFileName = opts.tempDir + "/" + nextFileName(); - std::streampos nextSortedFileWriterOffset = 0; - size_t memorySize = 0; - - _mergeIt.reset(); - _iters.clear(); + opts.maxMemoryUsageBytes = _memoryLimit; + opts.extSortAllowed = _allowDiskUse; + opts.limit = _limit != std::numeric_limits<size_t>::max() ? _limit : 0; + + auto comp = [&](const SorterData& lhs, const SorterData& rhs) { + auto size = lhs.first.size(); + auto& left = lhs.first; + auto& right = rhs.first; + for (size_t idx = 0; idx < size; ++idx) { + auto [lhsTag, lhsVal] = left.getViewOfValue(idx); + auto [rhsTag, rhsVal] = right.getViewOfValue(idx); + auto [tag, val] = value::compareValue(lhsTag, lhsVal, rhsTag, rhsVal); + + auto result = value::bitcastTo<int32_t>(val); + if (result) { + return _dirs[idx] == value::SortDirection::Descending ? -result : result; + } + } - auto spill = [&]() { - SortedFileWriter<value::MaterializedRow, value::MaterializedRow> writer{ - opts, spillFileName, nextSortedFileWriterOffset}; + return 0; + }; - for (auto& [k, v] : _st) { - writer.addAlreadySorted(k, v); - } - _st.clear(); - memorySize = 0; + _sorter.reset(Sorter<value::MaterializedRow, value::MaterializedRow>::make(opts, comp, {})); + _mergeIt.reset(); +} - auto iteratorPtr = writer.done(); - nextSortedFileWriterOffset = writer.getFileEndOffset(); +void SortStage::open(bool reOpen) { + _commonStats.opens++; + _children[0]->open(reOpen); - _iters.push_back(std::shared_ptr<SorterIterator>(iteratorPtr)); - }; + makeSorter(); while (_children[0]->getNext() == PlanState::ADVANCED) { - value::MaterializedRow keys; - value::MaterializedRow vals; - keys._fields.reserve(_inKeyAccessors.size()); - vals._fields.reserve(_inValueAccessors.size()); + value::MaterializedRow keys{_inKeyAccessors.size()}; + value::MaterializedRow vals{_inValueAccessors.size()}; + size_t idx = 0; for (auto accesor : _inKeyAccessors) { - keys._fields.push_back(value::OwnedValueAccessor{}); auto [tag, val] = accesor->copyOrMoveValue(); - keys._fields.back().reset(true, tag, val); + keys.reset(idx++, true, tag, val); } + + idx = 0; for (auto accesor : _inValueAccessors) { - vals._fields.push_back(value::OwnedValueAccessor{}); auto [tag, val] = accesor->copyOrMoveValue(); - vals._fields.back().reset(true, tag, val); + vals.reset(idx++, true, tag, val); } - memorySize += keys.memUsageForSorter(); - memorySize += vals.memUsageForSorter(); - - _st.emplace(std::move(keys), std::move(vals)); - if (_st.size() - 1 == _limit) { - _st.erase(--_st.end()); - } + _sorter->emplace(std::move(keys), std::move(vals)); if (_tracker && _tracker->trackProgress<TrialRunProgressTracker::kNumResults>(1)) { // If we either hit the maximum number of document to return during the trial run, or @@ -184,81 +176,28 @@ void SortStage::open(bool reOpen) { _children[0]->close(); uasserted(ErrorCodes::QueryTrialRunCompleted, "Trial run early exit"); } - - // Test if we have to spill - // TODO SERVER-49829 - topk spilling - if (_limit == std::numeric_limits<std::size_t>::max() && memorySize > _memoryLimit) { - uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - str::stream() - << "Sort exceeded memory limit of " << _memoryLimit - << " bytes, but did not opt in to external sorting. Aborting operation." - << " Pass allowDiskUse:true to opt in.", - _allowDiskUse); - - spill(); - } } - if (!_iters.empty()) { - // Spill the last part that still sits in memory. - if (!_st.empty()) { - spill(); - } + _mergeIt.reset(_sorter->done()); - _mergeIt.reset(SorterIterator::merge( - _iters, spillFileName, opts, [&](const SorterData& lhs, const SorterData& rhs) { - for (size_t idx = 0; idx < lhs.first._fields.size(); ++idx) { - auto [lhsTag, lhsVal] = lhs.first._fields[idx].getViewOfValue(); - auto [rhsTag, rhsVal] = rhs.first._fields[idx].getViewOfValue(); - auto [tag, val] = value::compareValue(lhsTag, lhsVal, rhsTag, rhsVal); - invariant(tag == value::TypeTags::NumberInt32); - auto result = value::bitcastTo<int32_t>(val); - if (val) { - return _dirs[idx] == value::SortDirection::Descending ? -result : result; - } - } - - return 0; - })); - - // Switch all output accessors to point to the spilled data. - for (auto&& [_, acc] : _outAccessors) { - acc.setIndex(1); - } - } _children[0]->close(); - - _stIt = _st.end(); } PlanState SortStage::getNext() { // When the sort spilled data to disk then read back the sorted runs. - if (_mergeIt) { - if (_mergeIt->more()) { - _mergeData = _mergeIt->next(); + if (_mergeIt && _mergeIt->more()) { + _mergeData = _mergeIt->next(); - return trackPlanState(PlanState::ADVANCED); - } else { - return trackPlanState(PlanState::IS_EOF); - } - } - - if (_stIt == _st.end()) { - _stIt = _st.begin(); + return trackPlanState(PlanState::ADVANCED); } else { - ++_stIt; - } - - if (_stIt == _st.end()) { return trackPlanState(PlanState::IS_EOF); } - - return trackPlanState(PlanState::ADVANCED); } void SortStage::close() { _commonStats.closes++; - _st.clear(); + _mergeIt.reset(); + _sorter.reset(); } std::unique_ptr<PlanStageStats> SortStage::getStats() const { @@ -306,5 +245,3 @@ std::vector<DebugPrinter::Block> SortStage::debugPrint() const { } } // namespace sbe } // namespace mongo - -#include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/exec/sbe/stages/sort.h b/src/mongo/db/exec/sbe/stages/sort.h index d16b93ec2f8..73140c476f1 100644 --- a/src/mongo/db/exec/sbe/stages/sort.h +++ b/src/mongo/db/exec/sbe/stages/sort.h @@ -35,6 +35,8 @@ namespace mongo { template <typename Key, typename Value> class SortIteratorInterface; +template <typename Key, typename Value> +class Sorter; } // namespace mongo namespace mongo::sbe { @@ -64,11 +66,7 @@ public: std::vector<DebugPrinter::Block> debugPrint() const final; private: - using TableType = std:: - multimap<value::MaterializedRow, value::MaterializedRow, value::MaterializedRowComparator>; - - using SortKeyAccessor = value::MaterializedRowKeyAccessor<TableType::iterator>; - using SortValueAccessor = value::MaterializedRowValueAccessor<TableType::iterator>; + void makeSorter(); using SorterIterator = SortIteratorInterface<value::MaterializedRow, value::MaterializedRow>; using SorterData = std::pair<value::MaterializedRow, value::MaterializedRow>; @@ -83,16 +81,12 @@ private: std::vector<value::SlotAccessor*> _inKeyAccessors; std::vector<value::SlotAccessor*> _inValueAccessors; - value::SlotMap<value::SwitchAccessor> _outAccessors; - - TableType _st; - TableType::iterator _stIt; + value::SlotMap<std::unique_ptr<value::SlotAccessor>> _outAccessors; - // Data that has already been spilled. - std::vector<std::shared_ptr<SorterIterator>> _iters; std::unique_ptr<SorterIterator> _mergeIt; SorterData _mergeData; SorterData* _mergeDataIt{&_mergeData}; + std::unique_ptr<Sorter<value::MaterializedRow, value::MaterializedRow>> _sorter; // If provided, used during a trial run to accumulate certain execution stats. Once the trial // run is complete, this pointer is reset to nullptr. diff --git a/src/mongo/db/exec/sbe/stages/spool.cpp b/src/mongo/db/exec/sbe/stages/spool.cpp index b11dd4bc7b7..939aefd4e04 100644 --- a/src/mongo/db/exec/sbe/stages/spool.cpp +++ b/src/mongo/db/exec/sbe/stages/spool.cpp @@ -80,13 +80,12 @@ void SpoolEagerProducerStage::open(bool reOpen) { } while (_children[0]->getNext() == PlanState::ADVANCED) { - value::MaterializedRow vals; - vals._fields.reserve(_inAccessors.size()); + value::MaterializedRow vals{_inAccessors.size()}; + size_t idx = 0; for (auto accessor : _inAccessors) { - vals._fields.push_back(value::OwnedValueAccessor{}); auto [tag, val] = accessor->copyOrMoveValue(); - vals._fields.back().reset(true, tag, val); + vals.reset(idx++, true, tag, val); } _buffer->emplace_back(std::move(vals)); @@ -220,16 +219,14 @@ PlanState SpoolLazyProducerStage::getNext() { if (pass) { // We either haven't got a predicate, or it has passed. In both cases, we need pass // through the input values, and store them into the buffer. - value::MaterializedRow vals; - vals._fields.reserve(_inAccessors.size()); + value::MaterializedRow vals{_inAccessors.size()}; for (size_t idx = 0; idx < _inAccessors.size(); ++idx) { auto [tag, val] = _inAccessors[idx]->getViewOfValue(); _outAccessors[_vals[idx]].reset(tag, val); - vals._fields.push_back(value::OwnedValueAccessor{}); auto [copyTag, copyVal] = value::copyValue(tag, val); - vals._fields.back().reset(true, copyTag, copyVal); + vals.reset(idx, true, copyTag, copyVal); } _buffer->emplace_back(std::move(vals)); diff --git a/src/mongo/db/exec/sbe/values/slot.cpp b/src/mongo/db/exec/sbe/values/slot.cpp index 32a2de43f10..ec03d912b41 100644 --- a/src/mongo/db/exec/sbe/values/slot.cpp +++ b/src/mongo/db/exec/sbe/values/slot.cpp @@ -156,14 +156,12 @@ static std::pair<TypeTags, Value> deserializeTagVal(BufReader& buf) { MaterializedRow MaterializedRow::deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) { - MaterializedRow result; - auto cnt = buf.read<size_t>(); - result._fields.resize(cnt); + MaterializedRow result{cnt}; for (size_t idx = 0; idx < cnt; ++idx) { auto [tag, val] = deserializeTagVal(buf); - result._fields[idx].reset(true, tag, val); + result.reset(idx, true, tag, val); } return result; @@ -266,10 +264,10 @@ static void serializeTagValue(BufBuilder& buf, TypeTags tag, Value val) { } void MaterializedRow::serializeForSorter(BufBuilder& buf) const { - buf.appendNum(_fields.size()); + buf.appendNum(size()); - for (size_t idx = 0; idx < _fields.size(); ++idx) { - auto [tag, val] = _fields[idx].getViewOfValue(); + for (size_t idx = 0; idx < size(); ++idx) { + auto [tag, val] = getViewOfValue(idx); serializeTagValue(buf, tag, val); } } @@ -349,8 +347,8 @@ static int getApproximateSize(TypeTags tag, Value val) { int MaterializedRow::memUsageForSorter() const { int result = sizeof(MaterializedRow); - for (size_t idx = 0; idx < _fields.size(); ++idx) { - auto [tag, val] = _fields[idx].getViewOfValue(); + for (size_t idx = 0; idx < size(); ++idx) { + auto [tag, val] = getViewOfValue(idx); result += getApproximateSize(tag, val); } diff --git a/src/mongo/db/exec/sbe/values/slot.h b/src/mongo/db/exec/sbe/values/slot.h index 5cd2f661d3b..a2336250618 100644 --- a/src/mongo/db/exec/sbe/values/slot.h +++ b/src/mongo/db/exec/sbe/values/slot.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/exec/sbe/values/value.h" +#include <boost/container/small_vector.hpp> namespace mongo { class BufReader; @@ -189,9 +190,9 @@ private: } } + bool _owned{false}; TypeTags _tag{TypeTags::Nothing}; Value _val; - bool _owned{false}; }; /** @@ -271,7 +272,7 @@ public: MaterializedRowKeyAccessor(T& it, size_t slot) : _it(it), _slot(slot) {} std::pair<TypeTags, Value> getViewOfValue() const override { - return _it->first._fields[_slot].getViewOfValue(); + return _it->first.getViewOfValue(_slot); } std::pair<TypeTags, Value> copyOrMoveValue() override { // We can never move out values from keys. @@ -299,14 +300,14 @@ public: MaterializedRowValueAccessor(T& it, size_t slot) : _it(it), _slot(slot) {} std::pair<TypeTags, Value> getViewOfValue() const override { - return _it->second._fields[_slot].getViewOfValue(); + return _it->second.getViewOfValue(_slot); } std::pair<TypeTags, Value> copyOrMoveValue() override { - return _it->second._fields[_slot].copyOrMoveValue(); + return _it->second.copyOrMoveValue(_slot); } void reset(bool owned, TypeTags tag, Value val) { - _it->second._fields[_slot].reset(owned, tag, val); + _it->second.reset(_slot, owned, tag, val); } private: @@ -329,14 +330,14 @@ public: : _container(container), _it(it), _slot(slot) {} std::pair<TypeTags, Value> getViewOfValue() const override { - return _container[_it]._fields[_slot].getViewOfValue(); + return _container[_it].getViewOfValue(_slot); } std::pair<TypeTags, Value> copyOrMoveValue() override { - return _container[_it]._fields[_slot].copyOrMoveValue(); + return _container[_it].copyOrMoveValue(_slot); } void reset(bool owned, TypeTags tag, Value val) { - _container[_it]._fields[_slot].reset(owned, tag, val); + _container[_it].reset(_slot, owned, tag, val); } private: @@ -345,22 +346,102 @@ private: const size_t _slot; }; -struct MaterializedRow { +/** + * This class holds values in a buffer. The most common usage is a sort and hash agg plan stages. + */ +class MaterializedRow { +public: + MaterializedRow(size_t count = 0) { + resize(count); + } + + MaterializedRow(const MaterializedRow& other) { + resize(other.size()); + copy(other); + } + + MaterializedRow(MaterializedRow&& other) { + swap(*this, other); + } + + ~MaterializedRow() { + if (_data) { + release(); + delete[] _data; + } + } + + MaterializedRow& operator=(MaterializedRow other) { + swap(*this, other); + return *this; + } + + /** + * Make deep copies of values stored in the buffer. + */ void makeOwned() { - for (auto& f : _fields) { - auto [tag, val] = f.getViewOfValue(); - auto [copyTag, copyVal] = copyValue(tag, val); - f.reset(copyTag, copyVal); + for (size_t idx = 0; idx < _count; ++idx) { + if (!owned()[idx]) { + auto [tag, val] = value::copyValue(tags()[idx], values()[idx]); + values()[idx] = val; + tags()[idx] = tag; + owned()[idx] = true; + } + } + } + + std::pair<value::TypeTags, value::Value> getViewOfValue(size_t idx) const { + return {tags()[idx], values()[idx]}; + } + + std::pair<value::TypeTags, value::Value> copyOrMoveValue(size_t idx) { + if (owned()[idx]) { + owned()[idx] = false; + return {tags()[idx], values()[idx]}; + } else { + return value::copyValue(tags()[idx], values()[idx]); + } + } + + void reset(size_t idx, bool own, value::TypeTags tag, value::Value val) { + if (owned()[idx]) { + value::releaseValue(tags()[idx], values()[idx]); + owned()[idx] = false; + } + values()[idx] = val; + tags()[idx] = tag; + owned()[idx] = own; + } + + size_t size() const { + return _count; + } + + void resize(size_t count) { + if (_data) { + release(); + delete[] _data; + _data = nullptr; + _count = 0; + } + + if (count) { + _data = new char[sizeInBytes(count)]; + _count = count; + auto ownedPtr = owned(); + while (count--) { + *ownedPtr++ = false; + } } } bool operator==(const MaterializedRow& rhs) const { - for (size_t idx = 0; idx < _fields.size(); ++idx) { - auto [lhsTag, lhsVal] = _fields[idx].getViewOfValue(); - auto [rhsTag, rhsVal] = rhs._fields[idx].getViewOfValue(); + for (size_t idx = 0; idx < size(); ++idx) { + auto [lhsTag, lhsVal] = getViewOfValue(idx); + auto [rhsTag, rhsVal] = rhs.getViewOfValue(idx); auto [tag, val] = compareValue(lhsTag, lhsVal, rhsTag, rhsVal); - if (tag != TypeTags::NumberInt32 || val != 0) { + if (tag != value::TypeTags::NumberInt32 || val != 0) { return false; } } @@ -373,18 +454,77 @@ struct MaterializedRow { static MaterializedRow deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&); void serializeForSorter(BufBuilder& buf) const; int memUsageForSorter() const; + auto getOwned() const { + auto result = *this; + result.makeOwned(); + return result; + } + +private: + static size_t sizeInBytes(size_t count) { + return count * (sizeof(value::Value) + sizeof(value::TypeTags) + sizeof(bool)); + } + + value::Value* values() const { + return reinterpret_cast<value::Value*>(_data); + } + + value::TypeTags* tags() const { + return reinterpret_cast<value::TypeTags*>(_data + _count * sizeof(value::Value)); + } + + bool* owned() const { + return reinterpret_cast<bool*>(_data + + _count * (sizeof(value::Value) + sizeof(value::TypeTags))); + } + + void release() { + for (size_t idx = 0; idx < _count; ++idx) { + if (owned()[idx]) { + value::releaseValue(tags()[idx], values()[idx]); + owned()[idx] = false; + } + } + } - std::vector<OwnedValueAccessor> _fields; + /** + * Makes a deep copy on the incoming row. + */ + void copy(const MaterializedRow& other) { + invariant(_count == other._count); + + for (size_t idx = 0; idx < _count; ++idx) { + if (other.owned()[idx]) { + auto [tag, val] = value::copyValue(other.tags()[idx], other.values()[idx]); + values()[idx] = val; + tags()[idx] = tag; + owned()[idx] = true; + } else { + values()[idx] = other.values()[idx]; + tags()[idx] = other.tags()[idx]; + owned()[idx] = false; + } + } + } + + friend void swap(MaterializedRow& lhs, MaterializedRow& rhs) noexcept { + std::swap(lhs._data, rhs._data); + std::swap(lhs._count, rhs._count); + } + + char* _data{nullptr}; + size_t _count{0}; }; + struct MaterializedRowComparator { MaterializedRowComparator(const std::vector<value::SortDirection>& direction) : _direction(direction) {} bool operator()(const MaterializedRow& lhs, const MaterializedRow& rhs) const { - for (size_t idx = 0; idx < lhs._fields.size(); ++idx) { - auto [lhsTag, lhsVal] = lhs._fields[idx].getViewOfValue(); - auto [rhsTag, rhsVal] = rhs._fields[idx].getViewOfValue(); + for (size_t idx = 0; idx < lhs.size(); ++idx) { + auto [lhsTag, lhsVal] = lhs.getViewOfValue(idx); + auto [rhsTag, rhsVal] = rhs.getViewOfValue(idx); auto [tag, val] = compareValue(lhsTag, lhsVal, rhsTag, rhsVal); if (tag != TypeTags::NumberInt32) { return false; @@ -410,8 +550,8 @@ struct MaterializedRowComparator { struct MaterializedRowHasher { std::size_t operator()(const MaterializedRow& k) const { size_t res = 17; - for (auto& f : k._fields) { - auto [tag, val] = f.getViewOfValue(); + for (size_t idx = 0; idx < k.size(); ++idx) { + auto [tag, val] = k.getViewOfValue(idx); res = res * 31 + hashValue(tag, val); } return res; |