summaryrefslogtreecommitdiff
path: root/src/mongo/db/sorter
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2013-05-07 14:29:12 -0400
committerMathias Stearn <mathias@10gen.com>2013-05-13 17:02:42 -0400
commit52f8a9555f4dd71a7787d4a7a1412a9e5e8415a2 (patch)
tree4ced329e08c47251ec5b688f27cd23efd98610f9 /src/mongo/db/sorter
parent439df8fff2e5b3140bf7d32393ce4a18cf5fd876 (diff)
downloadmongo-52f8a9555f4dd71a7787d4a7a1412a9e5e8415a2.tar.gz
SERVER-9443 First part of limit support for new framework
This isn't currently hooked up anywhere. It also doesn't yet significantly optimize Top-K sorting where the result set doesn't fit inside of our memory budget.
Diffstat (limited to 'src/mongo/db/sorter')
-rw-r--r--src/mongo/db/sorter/sorter.cpp209
-rw-r--r--src/mongo/db/sorter/sorter.h4
2 files changed, 200 insertions, 13 deletions
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index a958b46ba87..b7911ff6c6e 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -64,6 +64,8 @@ namespace mongo {
public:
typedef std::pair<Key, Value> Data;
+ InMemIterator(const Data& singleValue) :_data(1, singleValue) {}
+
template <typename Container>
InMemIterator(const Container& input) :_data(input.begin(), input.end()) {}
@@ -175,7 +177,7 @@ namespace mongo {
const SortOptions& opts,
const Comparator& comp)
: _opts(opts)
- , _done(false)
+ , _remaining(opts.limit ? opts.limit : numeric_limits<unsigned long long>::max())
, _first(true)
, _greater(comp)
{
@@ -187,7 +189,7 @@ namespace mongo {
}
if (_heap.empty()) {
- _done = true;
+ _remaining = 0;
return;
}
@@ -197,9 +199,22 @@ namespace mongo {
_heap.pop_back();
}
- bool more() { return !_done && (_first || !_heap.empty() || _current->more()); }
+ bool more() {
+ if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
+ return true;
+
+ // We are done so clean up resources.
+ // Can't do this in next() due to lifetime guarantees of unowned Data.
+ _heap.clear();
+ _current.reset();
+
+ return false;
+ }
+
Data next() {
- verify(!_done);
+ verify(_remaining);
+
+ _remaining--;
if (_first) {
_first = false;
@@ -249,7 +264,7 @@ namespace mongo {
class STLComparator { // uses greater rather than less-than to maintain a MinHeap
public:
- STLComparator(const Comparator& comp) : _comp(comp) {}
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
bool operator () (ptr<const Stream> lhs, ptr<const Stream> rhs) const {
// first compare data
int ret = _comp(lhs->current(), rhs->current());
@@ -264,7 +279,7 @@ namespace mongo {
};
SortOptions _opts;
- bool _done;
+ unsigned long long _remaining;
bool _first;
boost::shared_ptr<Stream> _current;
std::vector<boost::shared_ptr<Stream> > _heap; // MinHeap
@@ -343,7 +358,7 @@ namespace mongo {
, _settings(settings)
, _opts(opts)
, _memUsed(0)
- {}
+ { verify(_opts.limit == 0); }
void add(const Key& key, const Value& val) {
_data.push_back(std::make_pair(key, val));
@@ -372,7 +387,7 @@ namespace mongo {
private:
class STLComparator {
public:
- STLComparator(const Comparator& comp) : _comp(comp) {}
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
bool operator () (const Data& lhs, const Data& rhs) const {
return _comp(lhs, rhs) < 0;
}
@@ -419,16 +434,186 @@ namespace mongo {
std::vector<boost::shared_ptr<Iterator> > _iters; // data that has already been spilled
};
+ template <typename Key, typename Value, typename Comparator>
+ class LimitOneSorter : public Sorter<Key, Value> {
+ // Since this class is only used for limit==1, it omits all logic to
+ // spill to disk and only tracks memory usage if explicitly requested.
+ public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+
+ LimitOneSorter(const SortOptions& opts, const Comparator& comp)
+ : _comp(comp)
+ { verify(opts.limit == 1); }
+
+ void add(const Key& key, const Value& val) {
+ Data contender(key, val);
+ if (_comp(_best, contender) <= 0)
+ return; // not good enough
+
+ _best = contender;
+ }
+
+ Iterator* done() {
+ return new sorter::InMemIterator<Key, Value>(_best);
+ }
+
+ // TEMP these are here for compatibility. Will be replaced with a general stats API
+ int numFiles() const { return 0; }
+ size_t memUsed() const { return _best.first.memUsageForSorter()
+ + _best.second.memUsageForSorter(); }
+
+ private:
+ const Comparator _comp;
+ Data _best;
+ };
+
+ template <typename Key, typename Value, typename Comparator>
+ class TopKSorter : public Sorter<Key, Value> {
+ public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings
+ ,typename Value::SorterDeserializeSettings
+ > Settings;
+
+ TopKSorter(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings())
+ : _comp(comp)
+ , _settings(settings)
+ , _opts(opts)
+ , _memUsed(0)
+ {
+ // This also *works* with limit==1 but LimitOneSorter should be used instead
+ verify(_opts.limit > 1);
+
+ // Preallocate a fixed sized vector of the required size if we
+ // don't expect it to have a major impact on our memory budget.
+ // This is the common case with small limits.
+ if ((sizeof(Data) * opts.limit) < opts.maxMemoryUsageBytes / 10) {
+ _data.reserve(opts.limit);
+ }
+ }
+
+ void add(const Key& key, const Value& val) {
+ STLComparator less(_comp);
+
+ if (_data.size() < _opts.limit) {
+ _data.push_back(std::make_pair(key, val));
+
+ _memUsed += key.memUsageForSorter();
+ _memUsed += val.memUsageForSorter();
+
+ if (_data.size() == _opts.limit)
+ std::make_heap(_data.begin(), _data.end(), less);
+
+ if (_memUsed > _opts.maxMemoryUsageBytes)
+ spill();
+
+ return;
+ }
+
+ verify(_data.size() == _opts.limit);
+
+ Data contender(key, val);
+ if (_comp(_data.front(), contender) <= 0)
+ return; // not good enough
+
+ // Remove the old worst pair and insert the contender, adjusting _memUsed
+
+ _memUsed += key.memUsageForSorter();
+ _memUsed += val.memUsageForSorter();
+
+ _memUsed -= _data.front().first.memUsageForSorter();
+ _memUsed -= _data.front().second.memUsageForSorter();
+
+ std::pop_heap(_data.begin(), _data.end(), less);
+ _data.back() = contender;
+ std::push_heap(_data.begin(), _data.end(), less);
+
+ if (_memUsed > _opts.maxMemoryUsageBytes)
+ spill();
+ }
+
+ Iterator* done() {
+ if (_iters.empty()) {
+ sort();
+ return new sorter::InMemIterator<Key, Value>(_data);
+ }
+
+ spill();
+ return Iterator::merge(_iters, _opts, _comp);
+ }
+
+ // TEMP these are here for compatibility. Will be replaced with a general stats API
+ int numFiles() const { return _iters.size(); }
+ size_t memUsed() const { return _memUsed; }
+
+ private:
+ class STLComparator {
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+ bool operator () (const Data& lhs, const Data& rhs) const {
+ return _comp(lhs, rhs) < 0;
+ }
+ private:
+ const Comparator& _comp;
+ };
+
+ void sort() {
+ STLComparator less(_comp);
+
+ if (_data.size() == _opts.limit) {
+ std::sort_heap(_data.begin(), _data.end(), less);
+ } else {
+ std::stable_sort(_data.begin(), _data.end(), less);
+ }
+ }
+
+ void spill() {
+ if (_data.empty())
+ return;
+
+ if (!_opts.extSortAllowed)
+ uasserted(16820, str::stream()
+ << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
+ << " bytes, but did not opt-in to external sorting. Aborting operation."
+ );
+
+ sort();
+
+ SortedFileWriter<Key, Value> writer(_settings);
+ for (size_t i=0; i<_data.size(); i++) {
+ writer.addAlreadySorted(_data[i].first, _data[i].second);
+ }
+
+ // clear _data and release backing array's memory
+ std::vector<Data>().swap(_data);
+
+ _iters.push_back(boost::shared_ptr<Iterator>(writer.done()));
+
+ _memUsed = 0;
+ }
+
+ const Comparator _comp;
+ const Settings _settings;
+ SortOptions _opts;
+ size_t _memUsed;
+ std::vector<Data> _data; // the "current" data
+ std::vector<boost::shared_ptr<Iterator> > _iters; // data that has already been spilled
+ };
+
template <typename Key, typename Value>
template <typename Comparator>
Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
const Comparator& comp,
const Settings& settings) {
- if (opts.limit == 0) {
- return new NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
+ switch (opts.limit) {
+ case 0: return new NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
+ case 1: return new LimitOneSorter<Key, Value, Comparator>(opts, comp);
+ default: return new TopKSorter<Key, Value, Comparator>(opts, comp, settings);
}
-
- verify(!"limit not yet supported");
}
}
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 0c81e758a39..8ae0153dae5 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -80,7 +80,7 @@ namespace mongo {
* Runtime options that control the Sorter's behavior
*/
struct SortOptions {
- long long limit; /// number of KV pairs to be returned. 0 for no limit.
+ unsigned long long limit; /// number of KV pairs to be returned. 0 for no limit.
size_t maxMemoryUsageBytes; /// Approximate.
bool extSortAllowed; /// If false, uassert if more mem needed than allowed.
SortOptions()
@@ -173,6 +173,8 @@ namespace mongo {
#define MONGO_CREATE_SORTER(Key, Value, Comparator) \
template class ::mongo::Sorter<Key, Value>; \
template class ::mongo::NoLimitSorter<Key, Value, Comparator>; \
+ template class ::mongo::LimitOneSorter<Key, Value, Comparator>; \
+ template class ::mongo::TopKSorter<Key, Value, Comparator>; \
template class ::mongo::SortIteratorInterface<Key, Value>; \
template class ::mongo::SortedFileWriter<Key, Value>; \
template class ::mongo::sorter::MergeIterator<Key, Value, Comparator>; \