summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMartin Neupauer <xmaton@messengeruser.com>2020-07-28 11:18:57 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-07 23:21:49 +0000
commit52060dc0df47fb415b0560ecfb65a29cdc20b7ac (patch)
tree3421e1881b5f39f85af2b6db1e0119b66f413f1e /src/mongo
parentca1d644a6f31477b247fa79b6345528aba165281 (diff)
downloadmongo-52060dc0df47fb415b0560ecfb65a29cdc20b7ac.tar.gz
SERVER-49829 - Implement spilling for Top K sort in SBE.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp9
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_join.cpp22
-rw-r--r--src/mongo/db/exec/sbe/stages/sort.cpp175
-rw-r--r--src/mongo/db/exec/sbe/stages/sort.h16
-rw-r--r--src/mongo/db/exec/sbe/stages/spool.cpp13
-rw-r--r--src/mongo/db/exec/sbe/values/slot.cpp16
-rw-r--r--src/mongo/db/exec/sbe/values/slot.h186
-rw-r--r--src/mongo/db/query/plan_cache.cpp2
-rw-r--r--src/mongo/db/sorter/sorter.cpp16
-rw-r--r--src/mongo/db/sorter/sorter.h4
10 files changed, 269 insertions, 190 deletions
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index aa5c0deb58e..8dde59a76a4 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -104,21 +104,20 @@ void HashAggStage::open(bool reOpen) {
_children[0]->open(reOpen);
while (_children[0]->getNext() == PlanState::ADVANCED) {
- value::MaterializedRow key;
- key._fields.resize(_inKeyAccessors.size());
+ value::MaterializedRow key{_inKeyAccessors.size()};
// Copy keys in order to do the lookup.
size_t idx = 0;
for (auto& p : _inKeyAccessors) {
auto [tag, val] = p->getViewOfValue();
- key._fields[idx++].reset(false, tag, val);
+ key.reset(idx++, false, tag, val);
}
- auto [it, inserted] = _ht.emplace(std::move(key), value::MaterializedRow{});
+ auto [it, inserted] = _ht.try_emplace(std::move(key), value::MaterializedRow{0});
if (inserted) {
// Copy keys.
const_cast<value::MaterializedRow&>(it->first).makeOwned();
// Initialize accumulators.
- it->second._fields.resize(_outAggAccessors.size());
+ it->second.resize(_outAggAccessors.size());
}
// Accumulate.
diff --git a/src/mongo/db/exec/sbe/stages/hash_join.cpp b/src/mongo/db/exec/sbe/stages/hash_join.cpp
index 4f93c247376..d191d561b52 100644
--- a/src/mongo/db/exec/sbe/stages/hash_join.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_join.cpp
@@ -46,7 +46,8 @@ HashJoinStage::HashJoinStage(std::unique_ptr<PlanStage> outer,
_outerCond(std::move(outerCond)),
_outerProjects(std::move(outerProjects)),
_innerCond(std::move(innerCond)),
- _innerProjects(std::move(innerProjects)) {
+ _innerProjects(std::move(innerProjects)),
+ _probeKey(0) {
if (_outerCond.size() != _innerCond.size()) {
uasserted(4822823, "left and right size do not match");
}
@@ -98,7 +99,7 @@ void HashJoinStage::prepare(CompileCtx& ctx) {
_outOuterAccessors[slot] = _outOuterProjectAccessors.back().get();
}
- _probeKey._fields.resize(_inInnerKeyAccessors.size());
+ _probeKey.resize(_inInnerKeyAccessors.size());
_compiled = true;
}
@@ -119,25 +120,22 @@ void HashJoinStage::open(bool reOpen) {
_commonStats.opens++;
_children[0]->open(reOpen);
// Insert the outer side into the hash table.
-
while (_children[0]->getNext() == PlanState::ADVANCED) {
- value::MaterializedRow key;
- value::MaterializedRow project;
- key._fields.reserve(_inOuterKeyAccessors.size());
- project._fields.reserve(_inOuterProjectAccessors.size());
+ value::MaterializedRow key{_inOuterKeyAccessors.size()};
+ value::MaterializedRow project{_inOuterProjectAccessors.size()};
+ size_t idx = 0;
// Copy keys in order to do the lookup.
for (auto& p : _inOuterKeyAccessors) {
- key._fields.push_back(value::OwnedValueAccessor{});
auto [tag, val] = p->copyOrMoveValue();
- key._fields.back().reset(true, tag, val);
+ key.reset(idx++, true, tag, val);
}
+ idx = 0;
// Copy projects.
for (auto& p : _inOuterProjectAccessors) {
- project._fields.push_back(value::OwnedValueAccessor{});
auto [tag, val] = p->copyOrMoveValue();
- project._fields.back().reset(true, tag, val);
+ project.reset(idx++, true, tag, val);
}
_ht.emplace(std::move(key), std::move(project));
@@ -168,7 +166,7 @@ PlanState HashJoinStage::getNext() {
size_t idx = 0;
for (auto& p : _inInnerKeyAccessors) {
auto [tag, val] = p->getViewOfValue();
- _probeKey._fields[idx++].reset(false, tag, val);
+ _probeKey.reset(idx++, false, tag, val);
}
auto [low, hi] = _ht.equal_range(_probeKey);
diff --git a/src/mongo/db/exec/sbe/stages/sort.cpp b/src/mongo/db/exec/sbe/stages/sort.cpp
index 453e4a98f0a..7eec22e5079 100644
--- a/src/mongo/db/exec/sbe/stages/sort.cpp
+++ b/src/mongo/db/exec/sbe/stages/sort.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/exec/sbe/stages/sort.h"
#include "mongo/db/exec/sbe/expressions/expression.h"
-#include "mongo/db/sorter/sorter.h"
#include "mongo/util/str.h"
namespace {
@@ -42,6 +41,8 @@ std::string nextFileName() {
}
} // namespace
+#include "mongo/db/sorter/sorter.cpp"
+
namespace mongo {
namespace sbe {
SortStage::SortStage(std::unique_ptr<PlanStage> input,
@@ -59,7 +60,7 @@ SortStage::SortStage(std::unique_ptr<PlanStage> input,
_limit(limit),
_memoryLimit(memoryLimit),
_allowDiskUse(allowDiskUse),
- _st(value::MaterializedRowComparator{_dirs}),
+ _mergeData({0, 0}),
_tracker(tracker) {
_children.emplace_back(std::move(input));
@@ -76,100 +77,91 @@ std::unique_ptr<PlanStage> SortStage::clone() const {
void SortStage::prepare(CompileCtx& ctx) {
_children[0]->prepare(ctx);
- value::SlotSet dupCheck;
-
size_t counter = 0;
// Process order by fields.
for (auto& slot : _obs) {
- auto [it, inserted] = dupCheck.insert(slot);
- uassert(4822812, str::stream() << "duplicate field: " << slot, inserted);
-
_inKeyAccessors.emplace_back(_children[0]->getAccessor(ctx, slot));
- std::vector<std::unique_ptr<value::SlotAccessor>> accessors;
- accessors.emplace_back(std::make_unique<SortKeyAccessor>(_stIt, counter));
- accessors.emplace_back(std::make_unique<value::MaterializedRowKeyAccessor<SorterData*>>(
- _mergeDataIt, counter));
- _outAccessors.emplace(slot, value::SwitchAccessor{std::move(accessors)});
+ auto [it, inserted] =
+ _outAccessors.emplace(slot,
+ std::make_unique<value::MaterializedRowKeyAccessor<SorterData*>>(
+ _mergeDataIt, counter));
++counter;
+ uassert(4822812, str::stream() << "duplicate field: " << slot, inserted);
}
counter = 0;
// Process value fields.
for (auto& slot : _vals) {
- auto [it, inserted] = dupCheck.insert(slot);
- uassert(4822813, str::stream() << "duplicate field: " << slot, inserted);
-
_inValueAccessors.emplace_back(_children[0]->getAccessor(ctx, slot));
- std::vector<std::unique_ptr<value::SlotAccessor>> accessors;
- accessors.emplace_back(std::make_unique<SortValueAccessor>(_stIt, counter));
- accessors.emplace_back(std::make_unique<value::MaterializedRowValueAccessor<SorterData*>>(
- _mergeDataIt, counter));
- _outAccessors.emplace(slot, value::SwitchAccessor{std::move(accessors)});
+ auto [it, inserted] = _outAccessors.emplace(
+ slot,
+ std::make_unique<value::MaterializedRowValueAccessor<SorterData*>>(_mergeDataIt,
+ counter));
++counter;
+ uassert(4822813, str::stream() << "duplicate field: " << slot, inserted);
}
}
value::SlotAccessor* SortStage::getAccessor(CompileCtx& ctx, value::SlotId slot) {
if (auto it = _outAccessors.find(slot); it != _outAccessors.end()) {
- return &it->second;
+ return it->second.get();
}
return ctx.getAccessor(slot);
}
-void SortStage::open(bool reOpen) {
- _commonStats.opens++;
- _children[0]->open(reOpen);
-
+void SortStage::makeSorter() {
SortOptions opts;
opts.tempDir = storageGlobalParams.dbpath + "/_tmp";
- std::string spillFileName = opts.tempDir + "/" + nextFileName();
- std::streampos nextSortedFileWriterOffset = 0;
- size_t memorySize = 0;
-
- _mergeIt.reset();
- _iters.clear();
+ opts.maxMemoryUsageBytes = _memoryLimit;
+ opts.extSortAllowed = _allowDiskUse;
+ opts.limit = _limit != std::numeric_limits<size_t>::max() ? _limit : 0;
+
+ auto comp = [&](const SorterData& lhs, const SorterData& rhs) {
+ auto size = lhs.first.size();
+ auto& left = lhs.first;
+ auto& right = rhs.first;
+ for (size_t idx = 0; idx < size; ++idx) {
+ auto [lhsTag, lhsVal] = left.getViewOfValue(idx);
+ auto [rhsTag, rhsVal] = right.getViewOfValue(idx);
+ auto [tag, val] = value::compareValue(lhsTag, lhsVal, rhsTag, rhsVal);
+
+ auto result = value::bitcastTo<int32_t>(val);
+ if (result) {
+ return _dirs[idx] == value::SortDirection::Descending ? -result : result;
+ }
+ }
- auto spill = [&]() {
- SortedFileWriter<value::MaterializedRow, value::MaterializedRow> writer{
- opts, spillFileName, nextSortedFileWriterOffset};
+ return 0;
+ };
- for (auto& [k, v] : _st) {
- writer.addAlreadySorted(k, v);
- }
- _st.clear();
- memorySize = 0;
+ _sorter.reset(Sorter<value::MaterializedRow, value::MaterializedRow>::make(opts, comp, {}));
+ _mergeIt.reset();
+}
- auto iteratorPtr = writer.done();
- nextSortedFileWriterOffset = writer.getFileEndOffset();
+void SortStage::open(bool reOpen) {
+ _commonStats.opens++;
+ _children[0]->open(reOpen);
- _iters.push_back(std::shared_ptr<SorterIterator>(iteratorPtr));
- };
+ makeSorter();
while (_children[0]->getNext() == PlanState::ADVANCED) {
- value::MaterializedRow keys;
- value::MaterializedRow vals;
- keys._fields.reserve(_inKeyAccessors.size());
- vals._fields.reserve(_inValueAccessors.size());
+ value::MaterializedRow keys{_inKeyAccessors.size()};
+ value::MaterializedRow vals{_inValueAccessors.size()};
+ size_t idx = 0;
for (auto accesor : _inKeyAccessors) {
- keys._fields.push_back(value::OwnedValueAccessor{});
auto [tag, val] = accesor->copyOrMoveValue();
- keys._fields.back().reset(true, tag, val);
+ keys.reset(idx++, true, tag, val);
}
+
+ idx = 0;
for (auto accesor : _inValueAccessors) {
- vals._fields.push_back(value::OwnedValueAccessor{});
auto [tag, val] = accesor->copyOrMoveValue();
- vals._fields.back().reset(true, tag, val);
+ vals.reset(idx++, true, tag, val);
}
- memorySize += keys.memUsageForSorter();
- memorySize += vals.memUsageForSorter();
-
- _st.emplace(std::move(keys), std::move(vals));
- if (_st.size() - 1 == _limit) {
- _st.erase(--_st.end());
- }
+ _sorter->emplace(std::move(keys), std::move(vals));
if (_tracker && _tracker->trackProgress<TrialRunProgressTracker::kNumResults>(1)) {
// If we either hit the maximum number of document to return during the trial run, or
@@ -184,81 +176,28 @@ void SortStage::open(bool reOpen) {
_children[0]->close();
uasserted(ErrorCodes::QueryTrialRunCompleted, "Trial run early exit");
}
-
- // Test if we have to spill
- // TODO SERVER-49829 - topk spilling
- if (_limit == std::numeric_limits<std::size_t>::max() && memorySize > _memoryLimit) {
- uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- str::stream()
- << "Sort exceeded memory limit of " << _memoryLimit
- << " bytes, but did not opt in to external sorting. Aborting operation."
- << " Pass allowDiskUse:true to opt in.",
- _allowDiskUse);
-
- spill();
- }
}
- if (!_iters.empty()) {
- // Spill the last part that still sits in memory.
- if (!_st.empty()) {
- spill();
- }
+ _mergeIt.reset(_sorter->done());
- _mergeIt.reset(SorterIterator::merge(
- _iters, spillFileName, opts, [&](const SorterData& lhs, const SorterData& rhs) {
- for (size_t idx = 0; idx < lhs.first._fields.size(); ++idx) {
- auto [lhsTag, lhsVal] = lhs.first._fields[idx].getViewOfValue();
- auto [rhsTag, rhsVal] = rhs.first._fields[idx].getViewOfValue();
- auto [tag, val] = value::compareValue(lhsTag, lhsVal, rhsTag, rhsVal);
- invariant(tag == value::TypeTags::NumberInt32);
- auto result = value::bitcastTo<int32_t>(val);
- if (val) {
- return _dirs[idx] == value::SortDirection::Descending ? -result : result;
- }
- }
-
- return 0;
- }));
-
- // Switch all output accessors to point to the spilled data.
- for (auto&& [_, acc] : _outAccessors) {
- acc.setIndex(1);
- }
- }
_children[0]->close();
-
- _stIt = _st.end();
}
PlanState SortStage::getNext() {
// When the sort spilled data to disk then read back the sorted runs.
- if (_mergeIt) {
- if (_mergeIt->more()) {
- _mergeData = _mergeIt->next();
+ if (_mergeIt && _mergeIt->more()) {
+ _mergeData = _mergeIt->next();
- return trackPlanState(PlanState::ADVANCED);
- } else {
- return trackPlanState(PlanState::IS_EOF);
- }
- }
-
- if (_stIt == _st.end()) {
- _stIt = _st.begin();
+ return trackPlanState(PlanState::ADVANCED);
} else {
- ++_stIt;
- }
-
- if (_stIt == _st.end()) {
return trackPlanState(PlanState::IS_EOF);
}
-
- return trackPlanState(PlanState::ADVANCED);
}
void SortStage::close() {
_commonStats.closes++;
- _st.clear();
+ _mergeIt.reset();
+ _sorter.reset();
}
std::unique_ptr<PlanStageStats> SortStage::getStats() const {
@@ -306,5 +245,3 @@ std::vector<DebugPrinter::Block> SortStage::debugPrint() const {
}
} // namespace sbe
} // namespace mongo
-
-#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/exec/sbe/stages/sort.h b/src/mongo/db/exec/sbe/stages/sort.h
index d16b93ec2f8..73140c476f1 100644
--- a/src/mongo/db/exec/sbe/stages/sort.h
+++ b/src/mongo/db/exec/sbe/stages/sort.h
@@ -35,6 +35,8 @@
namespace mongo {
template <typename Key, typename Value>
class SortIteratorInterface;
+template <typename Key, typename Value>
+class Sorter;
} // namespace mongo
namespace mongo::sbe {
@@ -64,11 +66,7 @@ public:
std::vector<DebugPrinter::Block> debugPrint() const final;
private:
- using TableType = std::
- multimap<value::MaterializedRow, value::MaterializedRow, value::MaterializedRowComparator>;
-
- using SortKeyAccessor = value::MaterializedRowKeyAccessor<TableType::iterator>;
- using SortValueAccessor = value::MaterializedRowValueAccessor<TableType::iterator>;
+ void makeSorter();
using SorterIterator = SortIteratorInterface<value::MaterializedRow, value::MaterializedRow>;
using SorterData = std::pair<value::MaterializedRow, value::MaterializedRow>;
@@ -83,16 +81,12 @@ private:
std::vector<value::SlotAccessor*> _inKeyAccessors;
std::vector<value::SlotAccessor*> _inValueAccessors;
- value::SlotMap<value::SwitchAccessor> _outAccessors;
-
- TableType _st;
- TableType::iterator _stIt;
+ value::SlotMap<std::unique_ptr<value::SlotAccessor>> _outAccessors;
- // Data that has already been spilled.
- std::vector<std::shared_ptr<SorterIterator>> _iters;
std::unique_ptr<SorterIterator> _mergeIt;
SorterData _mergeData;
SorterData* _mergeDataIt{&_mergeData};
+ std::unique_ptr<Sorter<value::MaterializedRow, value::MaterializedRow>> _sorter;
// If provided, used during a trial run to accumulate certain execution stats. Once the trial
// run is complete, this pointer is reset to nullptr.
diff --git a/src/mongo/db/exec/sbe/stages/spool.cpp b/src/mongo/db/exec/sbe/stages/spool.cpp
index b11dd4bc7b7..939aefd4e04 100644
--- a/src/mongo/db/exec/sbe/stages/spool.cpp
+++ b/src/mongo/db/exec/sbe/stages/spool.cpp
@@ -80,13 +80,12 @@ void SpoolEagerProducerStage::open(bool reOpen) {
}
while (_children[0]->getNext() == PlanState::ADVANCED) {
- value::MaterializedRow vals;
- vals._fields.reserve(_inAccessors.size());
+ value::MaterializedRow vals{_inAccessors.size()};
+ size_t idx = 0;
for (auto accessor : _inAccessors) {
- vals._fields.push_back(value::OwnedValueAccessor{});
auto [tag, val] = accessor->copyOrMoveValue();
- vals._fields.back().reset(true, tag, val);
+ vals.reset(idx++, true, tag, val);
}
_buffer->emplace_back(std::move(vals));
@@ -220,16 +219,14 @@ PlanState SpoolLazyProducerStage::getNext() {
if (pass) {
// We either haven't got a predicate, or it has passed. In both cases, we need pass
// through the input values, and store them into the buffer.
- value::MaterializedRow vals;
- vals._fields.reserve(_inAccessors.size());
+ value::MaterializedRow vals{_inAccessors.size()};
for (size_t idx = 0; idx < _inAccessors.size(); ++idx) {
auto [tag, val] = _inAccessors[idx]->getViewOfValue();
_outAccessors[_vals[idx]].reset(tag, val);
- vals._fields.push_back(value::OwnedValueAccessor{});
auto [copyTag, copyVal] = value::copyValue(tag, val);
- vals._fields.back().reset(true, copyTag, copyVal);
+ vals.reset(idx, true, copyTag, copyVal);
}
_buffer->emplace_back(std::move(vals));
diff --git a/src/mongo/db/exec/sbe/values/slot.cpp b/src/mongo/db/exec/sbe/values/slot.cpp
index 32a2de43f10..ec03d912b41 100644
--- a/src/mongo/db/exec/sbe/values/slot.cpp
+++ b/src/mongo/db/exec/sbe/values/slot.cpp
@@ -156,14 +156,12 @@ static std::pair<TypeTags, Value> deserializeTagVal(BufReader& buf) {
MaterializedRow MaterializedRow::deserializeForSorter(BufReader& buf,
const SorterDeserializeSettings&) {
- MaterializedRow result;
-
auto cnt = buf.read<size_t>();
- result._fields.resize(cnt);
+ MaterializedRow result{cnt};
for (size_t idx = 0; idx < cnt; ++idx) {
auto [tag, val] = deserializeTagVal(buf);
- result._fields[idx].reset(true, tag, val);
+ result.reset(idx, true, tag, val);
}
return result;
@@ -266,10 +264,10 @@ static void serializeTagValue(BufBuilder& buf, TypeTags tag, Value val) {
}
void MaterializedRow::serializeForSorter(BufBuilder& buf) const {
- buf.appendNum(_fields.size());
+ buf.appendNum(size());
- for (size_t idx = 0; idx < _fields.size(); ++idx) {
- auto [tag, val] = _fields[idx].getViewOfValue();
+ for (size_t idx = 0; idx < size(); ++idx) {
+ auto [tag, val] = getViewOfValue(idx);
serializeTagValue(buf, tag, val);
}
}
@@ -349,8 +347,8 @@ static int getApproximateSize(TypeTags tag, Value val) {
int MaterializedRow::memUsageForSorter() const {
int result = sizeof(MaterializedRow);
- for (size_t idx = 0; idx < _fields.size(); ++idx) {
- auto [tag, val] = _fields[idx].getViewOfValue();
+ for (size_t idx = 0; idx < size(); ++idx) {
+ auto [tag, val] = getViewOfValue(idx);
result += getApproximateSize(tag, val);
}
diff --git a/src/mongo/db/exec/sbe/values/slot.h b/src/mongo/db/exec/sbe/values/slot.h
index 5cd2f661d3b..a2336250618 100644
--- a/src/mongo/db/exec/sbe/values/slot.h
+++ b/src/mongo/db/exec/sbe/values/slot.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/exec/sbe/values/value.h"
+#include <boost/container/small_vector.hpp>
namespace mongo {
class BufReader;
@@ -189,9 +190,9 @@ private:
}
}
+ bool _owned{false};
TypeTags _tag{TypeTags::Nothing};
Value _val;
- bool _owned{false};
};
/**
@@ -271,7 +272,7 @@ public:
MaterializedRowKeyAccessor(T& it, size_t slot) : _it(it), _slot(slot) {}
std::pair<TypeTags, Value> getViewOfValue() const override {
- return _it->first._fields[_slot].getViewOfValue();
+ return _it->first.getViewOfValue(_slot);
}
std::pair<TypeTags, Value> copyOrMoveValue() override {
// We can never move out values from keys.
@@ -299,14 +300,14 @@ public:
MaterializedRowValueAccessor(T& it, size_t slot) : _it(it), _slot(slot) {}
std::pair<TypeTags, Value> getViewOfValue() const override {
- return _it->second._fields[_slot].getViewOfValue();
+ return _it->second.getViewOfValue(_slot);
}
std::pair<TypeTags, Value> copyOrMoveValue() override {
- return _it->second._fields[_slot].copyOrMoveValue();
+ return _it->second.copyOrMoveValue(_slot);
}
void reset(bool owned, TypeTags tag, Value val) {
- _it->second._fields[_slot].reset(owned, tag, val);
+ _it->second.reset(_slot, owned, tag, val);
}
private:
@@ -329,14 +330,14 @@ public:
: _container(container), _it(it), _slot(slot) {}
std::pair<TypeTags, Value> getViewOfValue() const override {
- return _container[_it]._fields[_slot].getViewOfValue();
+ return _container[_it].getViewOfValue(_slot);
}
std::pair<TypeTags, Value> copyOrMoveValue() override {
- return _container[_it]._fields[_slot].copyOrMoveValue();
+ return _container[_it].copyOrMoveValue(_slot);
}
void reset(bool owned, TypeTags tag, Value val) {
- _container[_it]._fields[_slot].reset(owned, tag, val);
+ _container[_it].reset(_slot, owned, tag, val);
}
private:
@@ -345,22 +346,102 @@ private:
const size_t _slot;
};
-struct MaterializedRow {
+/**
+ * This class holds values in a buffer. The most common usage is a sort and hash agg plan stages.
+ */
+class MaterializedRow {
+public:
+ MaterializedRow(size_t count = 0) {
+ resize(count);
+ }
+
+ MaterializedRow(const MaterializedRow& other) {
+ resize(other.size());
+ copy(other);
+ }
+
+ MaterializedRow(MaterializedRow&& other) {
+ swap(*this, other);
+ }
+
+ ~MaterializedRow() {
+ if (_data) {
+ release();
+ delete[] _data;
+ }
+ }
+
+ MaterializedRow& operator=(MaterializedRow other) {
+ swap(*this, other);
+ return *this;
+ }
+
+ /**
+ * Make deep copies of values stored in the buffer.
+ */
void makeOwned() {
- for (auto& f : _fields) {
- auto [tag, val] = f.getViewOfValue();
- auto [copyTag, copyVal] = copyValue(tag, val);
- f.reset(copyTag, copyVal);
+ for (size_t idx = 0; idx < _count; ++idx) {
+ if (!owned()[idx]) {
+ auto [tag, val] = value::copyValue(tags()[idx], values()[idx]);
+ values()[idx] = val;
+ tags()[idx] = tag;
+ owned()[idx] = true;
+ }
+ }
+ }
+
+ std::pair<value::TypeTags, value::Value> getViewOfValue(size_t idx) const {
+ return {tags()[idx], values()[idx]};
+ }
+
+ std::pair<value::TypeTags, value::Value> copyOrMoveValue(size_t idx) {
+ if (owned()[idx]) {
+ owned()[idx] = false;
+ return {tags()[idx], values()[idx]};
+ } else {
+ return value::copyValue(tags()[idx], values()[idx]);
+ }
+ }
+
+ void reset(size_t idx, bool own, value::TypeTags tag, value::Value val) {
+ if (owned()[idx]) {
+ value::releaseValue(tags()[idx], values()[idx]);
+ owned()[idx] = false;
+ }
+ values()[idx] = val;
+ tags()[idx] = tag;
+ owned()[idx] = own;
+ }
+
+ size_t size() const {
+ return _count;
+ }
+
+ void resize(size_t count) {
+ if (_data) {
+ release();
+ delete[] _data;
+ _data = nullptr;
+ _count = 0;
+ }
+
+ if (count) {
+ _data = new char[sizeInBytes(count)];
+ _count = count;
+ auto ownedPtr = owned();
+ while (count--) {
+ *ownedPtr++ = false;
+ }
}
}
bool operator==(const MaterializedRow& rhs) const {
- for (size_t idx = 0; idx < _fields.size(); ++idx) {
- auto [lhsTag, lhsVal] = _fields[idx].getViewOfValue();
- auto [rhsTag, rhsVal] = rhs._fields[idx].getViewOfValue();
+ for (size_t idx = 0; idx < size(); ++idx) {
+ auto [lhsTag, lhsVal] = getViewOfValue(idx);
+ auto [rhsTag, rhsVal] = rhs.getViewOfValue(idx);
auto [tag, val] = compareValue(lhsTag, lhsVal, rhsTag, rhsVal);
- if (tag != TypeTags::NumberInt32 || val != 0) {
+ if (tag != value::TypeTags::NumberInt32 || val != 0) {
return false;
}
}
@@ -373,18 +454,77 @@ struct MaterializedRow {
static MaterializedRow deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&);
void serializeForSorter(BufBuilder& buf) const;
int memUsageForSorter() const;
+ auto getOwned() const {
+ auto result = *this;
+ result.makeOwned();
+ return result;
+ }
+
+private:
+ static size_t sizeInBytes(size_t count) {
+ return count * (sizeof(value::Value) + sizeof(value::TypeTags) + sizeof(bool));
+ }
+
+ value::Value* values() const {
+ return reinterpret_cast<value::Value*>(_data);
+ }
+
+ value::TypeTags* tags() const {
+ return reinterpret_cast<value::TypeTags*>(_data + _count * sizeof(value::Value));
+ }
+
+ bool* owned() const {
+ return reinterpret_cast<bool*>(_data +
+ _count * (sizeof(value::Value) + sizeof(value::TypeTags)));
+ }
+
+ void release() {
+ for (size_t idx = 0; idx < _count; ++idx) {
+ if (owned()[idx]) {
+ value::releaseValue(tags()[idx], values()[idx]);
+ owned()[idx] = false;
+ }
+ }
+ }
- std::vector<OwnedValueAccessor> _fields;
+ /**
+ * Makes a deep copy on the incoming row.
+ */
+ void copy(const MaterializedRow& other) {
+ invariant(_count == other._count);
+
+ for (size_t idx = 0; idx < _count; ++idx) {
+ if (other.owned()[idx]) {
+ auto [tag, val] = value::copyValue(other.tags()[idx], other.values()[idx]);
+ values()[idx] = val;
+ tags()[idx] = tag;
+ owned()[idx] = true;
+ } else {
+ values()[idx] = other.values()[idx];
+ tags()[idx] = other.tags()[idx];
+ owned()[idx] = false;
+ }
+ }
+ }
+
+ friend void swap(MaterializedRow& lhs, MaterializedRow& rhs) noexcept {
+ std::swap(lhs._data, rhs._data);
+ std::swap(lhs._count, rhs._count);
+ }
+
+ char* _data{nullptr};
+ size_t _count{0};
};
+
struct MaterializedRowComparator {
MaterializedRowComparator(const std::vector<value::SortDirection>& direction)
: _direction(direction) {}
bool operator()(const MaterializedRow& lhs, const MaterializedRow& rhs) const {
- for (size_t idx = 0; idx < lhs._fields.size(); ++idx) {
- auto [lhsTag, lhsVal] = lhs._fields[idx].getViewOfValue();
- auto [rhsTag, rhsVal] = rhs._fields[idx].getViewOfValue();
+ for (size_t idx = 0; idx < lhs.size(); ++idx) {
+ auto [lhsTag, lhsVal] = lhs.getViewOfValue(idx);
+ auto [rhsTag, rhsVal] = rhs.getViewOfValue(idx);
auto [tag, val] = compareValue(lhsTag, lhsVal, rhsTag, rhsVal);
if (tag != TypeTags::NumberInt32) {
return false;
@@ -410,8 +550,8 @@ struct MaterializedRowComparator {
struct MaterializedRowHasher {
std::size_t operator()(const MaterializedRow& k) const {
size_t res = 17;
- for (auto& f : k._fields) {
- auto [tag, val] = f.getViewOfValue();
+ for (size_t idx = 0; idx < k.size(); ++idx) {
+ auto [tag, val] = k.getViewOfValue(idx);
res = res * 31 + hashValue(tag, val);
}
return res;
diff --git a/src/mongo/db/query/plan_cache.cpp b/src/mongo/db/query/plan_cache.cpp
index 03dc254f272..715dd496024 100644
--- a/src/mongo/db/query/plan_cache.cpp
+++ b/src/mongo/db/query/plan_cache.cpp
@@ -582,7 +582,7 @@ Status PlanCache::set(const CanonicalQuery& query,
"match the number of solutions");
}
- const size_t newWorks = stdx::visit(
+ auto newWorks = stdx::visit(
visit_helper::Overloaded{[](std::vector<std::unique_ptr<PlanStageStats>>& stats) {
return stats[0]->common.works;
},
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 0b3030e2209..b83fe1d6619 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -142,6 +142,8 @@ public:
template <typename Container>
InMemIterator(const Container& input) : _data(input.begin(), input.end()) {}
+ InMemIterator(std::deque<Data> input) : _data(std::move(input)) {}
+
void openSource() {}
void closeSource() {}
@@ -590,12 +592,24 @@ public:
spill();
}
+ void emplace(Key&& key, Value&& val) override {
+ invariant(!_done);
+
+ _memUsed += key.memUsageForSorter();
+ _memUsed += val.memUsageForSorter();
+
+ _data.emplace_back(std::move(key), std::move(val));
+
+ if (_memUsed > _opts.maxMemoryUsageBytes)
+ spill();
+ }
+
Iterator* done() {
invariant(!_done);
if (this->_iters.empty()) {
sort();
- return new InMemIterator<Key, Value>(_data);
+ return new InMemIterator<Key, Value>(std::move(_data));
}
spill();
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index a3537c46a7d..1debcf5e318 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -240,7 +240,9 @@ public:
const Settings& settings = Settings());
virtual void add(const Key&, const Value&) = 0;
-
+ virtual void emplace(Key&& k, Value&& v) {
+ add(k, v);
+ }
/**
* Cannot add more data after calling done().
*