summaryrefslogtreecommitdiff
path: root/src/mongo/db/sorter/sorter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/sorter/sorter.cpp')
-rw-r--r--src/mongo/db/sorter/sorter.cpp1460
1 files changed, 740 insertions, 720 deletions
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 17444f24d9d..e589c9a8001 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -63,831 +63,851 @@
#include "mongo/util/unowned_ptr.h"
namespace mongo {
- namespace sorter {
+namespace sorter {
- using std::shared_ptr;
- using namespace mongoutils;
+using std::shared_ptr;
+using namespace mongoutils;
- // We need to use the "real" errno everywhere, not GetLastError() on Windows
- inline std::string myErrnoWithDescription() {
- int errnoCopy = errno;
- StringBuilder sb;
- sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy);
- return sb.str();
- }
+// We need to use the "real" errno everywhere, not GetLastError() on Windows
+inline std::string myErrnoWithDescription() {
+ int errnoCopy = errno;
+ StringBuilder sb;
+ sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy);
+ return sb.str();
+}
- template<typename Data, typename Comparator>
- void compIsntSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
- PRINT(typeid(comp).name());
- PRINT(lhs.first);
- PRINT(lhs.second);
- PRINT(rhs.first);
- PRINT(rhs.second);
- PRINT(comp(lhs, rhs));
- PRINT(comp(rhs, lhs));
- dassert(false);
- }
+template <typename Data, typename Comparator>
+void compIsntSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
+ PRINT(typeid(comp).name());
+ PRINT(lhs.first);
+ PRINT(lhs.second);
+ PRINT(rhs.first);
+ PRINT(rhs.second);
+ PRINT(comp(lhs, rhs));
+ PRINT(comp(rhs, lhs));
+ dassert(false);
+}
- template<typename Data, typename Comparator>
- void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
+template <typename Data, typename Comparator>
+void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
#if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER)
- // MSVC++ already does similar verification in debug mode in addition to using
- // algorithms that do more comparisons. Doing our own verification in addition makes
- // debug builds considerably slower without any additional safety.
-
- // test reversed comparisons
- const int regular = comp(lhs, rhs);
- if (regular == 0) {
- if (!(comp(rhs, lhs) == 0)) compIsntSane(comp, lhs, rhs);
- } else if (regular < 0) {
- if (!(comp(rhs, lhs) > 0)) compIsntSane(comp, lhs, rhs);
- } else /*regular > 0*/ {
- if (!(comp(rhs, lhs) < 0)) compIsntSane(comp, lhs, rhs);
- }
+ // MSVC++ already does similar verification in debug mode in addition to using
+ // algorithms that do more comparisons. Doing our own verification in addition makes
+ // debug builds considerably slower without any additional safety.
+
+ // test reversed comparisons
+ const int regular = comp(lhs, rhs);
+ if (regular == 0) {
+ if (!(comp(rhs, lhs) == 0))
+ compIsntSane(comp, lhs, rhs);
+ } else if (regular < 0) {
+ if (!(comp(rhs, lhs) > 0))
+ compIsntSane(comp, lhs, rhs);
+ } else /*regular > 0*/ {
+ if (!(comp(rhs, lhs) < 0))
+ compIsntSane(comp, lhs, rhs);
+ }
- // test reflexivity
- if (!(comp(lhs, lhs) == 0)) compIsntSane(comp, lhs, lhs);
- if (!(comp(rhs, rhs) == 0)) compIsntSane(comp, rhs, rhs);
+ // test reflexivity
+ if (!(comp(lhs, lhs) == 0))
+ compIsntSane(comp, lhs, lhs);
+ if (!(comp(rhs, rhs) == 0))
+ compIsntSane(comp, rhs, rhs);
#endif
- }
+}
- /** Ensures a named file is deleted when this object goes out of scope */
- class FileDeleter {
- public:
- FileDeleter(const std::string& fileName) :_fileName(fileName) {}
- ~FileDeleter() {
- DESTRUCTOR_GUARD(
- boost::filesystem::remove(_fileName);
- )
- }
- private:
- const std::string _fileName;
- };
-
- /** Returns results from sorted in-memory storage */
- template <typename Key, typename Value>
- class InMemIterator : public SortIteratorInterface<Key, Value> {
- public:
- typedef std::pair<Key, Value> Data;
-
- /// No data to iterate
- InMemIterator() {}
-
- /// Only a single value
- InMemIterator(const Data& singleValue) :_data(1, singleValue) {}
-
- /// Any number of values
- template <typename Container>
- InMemIterator(const Container& input) :_data(input.begin(), input.end()) {}
-
- bool more() { return !_data.empty(); }
- Data next() {
- Data out = _data.front();
- _data.pop_front();
- return out;
- }
+/** Ensures a named file is deleted when this object goes out of scope */
+class FileDeleter {
+public:
+ FileDeleter(const std::string& fileName) : _fileName(fileName) {}
+ ~FileDeleter() {
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName);)
+ }
- private:
- std::deque<Data> _data;
- };
-
- /** Returns results in order from a single file */
- template <typename Key, typename Value>
- class FileIterator : public SortIteratorInterface<Key, Value> {
- public:
- typedef std::pair<typename Key::SorterDeserializeSettings
- ,typename Value::SorterDeserializeSettings
- > Settings;
- typedef std::pair<Key, Value> Data;
-
- FileIterator(const std::string& fileName,
- const Settings& settings,
- std::shared_ptr<FileDeleter> fileDeleter)
- : _settings(settings)
- , _done(false)
- , _fileName(fileName)
- , _fileDeleter(fileDeleter)
- , _file(_fileName.c_str(), std::ios::in | std::ios::binary)
- {
- massert(16814, str::stream() << "error opening file \"" << _fileName << "\": "
- << myErrnoWithDescription(),
- _file.good());
-
- massert(16815, str::stream() << "unexpected empty file: " << _fileName,
- boost::filesystem::file_size(_fileName) != 0);
- }
+private:
+ const std::string _fileName;
+};
- bool more() {
- if (!_done)
- fillIfNeeded(); // may change _done
- return !_done;
- }
+/** Returns results from sorted in-memory storage */
+template <typename Key, typename Value>
+class InMemIterator : public SortIteratorInterface<Key, Value> {
+public:
+ typedef std::pair<Key, Value> Data;
- Data next() {
- verify(!_done);
- fillIfNeeded();
+ /// No data to iterate
+ InMemIterator() {}
- Data out;
- // Note: key must be read before value so can't pass directly to Data constructor
- out.first = Key::deserializeForSorter(*_reader, _settings.first);
- out.second = Value::deserializeForSorter(*_reader, _settings.second);
- return out;
- }
+ /// Only a single value
+ InMemIterator(const Data& singleValue) : _data(1, singleValue) {}
- private:
- void fillIfNeeded() {
- verify(!_done);
+ /// Any number of values
+ template <typename Container>
+ InMemIterator(const Container& input)
+ : _data(input.begin(), input.end()) {}
- if (!_reader || _reader->atEof())
- fill();
- }
+ bool more() {
+ return !_data.empty();
+ }
+ Data next() {
+ Data out = _data.front();
+ _data.pop_front();
+ return out;
+ }
+
+private:
+ std::deque<Data> _data;
+};
+
+/** Returns results in order from a single file */
+template <typename Key, typename Value>
+class FileIterator : public SortIteratorInterface<Key, Value> {
+public:
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings> Settings;
+ typedef std::pair<Key, Value> Data;
+
+ FileIterator(const std::string& fileName,
+ const Settings& settings,
+ std::shared_ptr<FileDeleter> fileDeleter)
+ : _settings(settings),
+ _done(false),
+ _fileName(fileName),
+ _fileDeleter(fileDeleter),
+ _file(_fileName.c_str(), std::ios::in | std::ios::binary) {
+ massert(16814,
+ str::stream() << "error opening file \"" << _fileName
+ << "\": " << myErrnoWithDescription(),
+ _file.good());
- void fill() {
- int32_t rawSize;
- read(&rawSize, sizeof(rawSize));
- if (_done) return;
+ massert(16815,
+ str::stream() << "unexpected empty file: " << _fileName,
+ boost::filesystem::file_size(_fileName) != 0);
+ }
- // negative size means compressed
- const bool compressed = rawSize < 0;
- const int32_t blockSize = std::abs(rawSize);
+ bool more() {
+ if (!_done)
+ fillIfNeeded(); // may change _done
+ return !_done;
+ }
- _buffer.reset(new char[blockSize]);
- read(_buffer.get(), blockSize);
- massert(16816, "file too short?", !_done);
+ Data next() {
+ verify(!_done);
+ fillIfNeeded();
- if (!compressed) {
- _reader.reset(new BufReader(_buffer.get(), blockSize));
- return;
- }
+ Data out;
+ // Note: key must be read before value so can't pass directly to Data constructor
+ out.first = Key::deserializeForSorter(*_reader, _settings.first);
+ out.second = Value::deserializeForSorter(*_reader, _settings.second);
+ return out;
+ }
- dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
+private:
+ void fillIfNeeded() {
+ verify(!_done);
- size_t uncompressedSize;
- massert(17061, "couldn't get uncompressed length",
- snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
+ if (!_reader || _reader->atEof())
+ fill();
+ }
- std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
- massert(17062, "decompression failed",
- snappy::RawUncompress(_buffer.get(),
- blockSize,
- decompressionBuffer.get()));
+ void fill() {
+ int32_t rawSize;
+ read(&rawSize, sizeof(rawSize));
+ if (_done)
+ return;
- // hold on to decompressed data and throw out compressed data at block exit
- _buffer.swap(decompressionBuffer);
- _reader.reset(new BufReader(_buffer.get(), uncompressedSize));
- }
+ // negative size means compressed
+ const bool compressed = rawSize < 0;
+ const int32_t blockSize = std::abs(rawSize);
- // sets _done to true on EOF - asserts on any other error
- void read(void* out, size_t size) {
- _file.read(reinterpret_cast<char*>(out), size);
- if (!_file.good()) {
- if (_file.eof()) {
- _done = true;
- return;
- }
-
- msgasserted(16817, str::stream() << "error reading file \""
- << _fileName << "\": "
- << myErrnoWithDescription());
- }
- verify(_file.gcount() == static_cast<std::streamsize>(size));
- }
+ _buffer.reset(new char[blockSize]);
+ read(_buffer.get(), blockSize);
+ massert(16816, "file too short?", !_done);
- const Settings _settings;
- bool _done;
- std::unique_ptr<char[]> _buffer;
- std::unique_ptr<BufReader> _reader;
- std::string _fileName;
- std::shared_ptr<FileDeleter> _fileDeleter; // Must outlive _file
- std::ifstream _file;
- };
-
- /** Merge-sorts results from 0 or more FileIterators */
- template <typename Key, typename Value, typename Comparator>
- class MergeIterator : public SortIteratorInterface<Key, Value> {
- public:
- typedef SortIteratorInterface<Key, Value> Input;
- typedef std::pair<Key, Value> Data;
-
-
- MergeIterator(const std::vector<std::shared_ptr<Input> >& iters,
- const SortOptions& opts,
- const Comparator& comp)
- : _opts(opts)
- , _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max())
- , _first(true)
- , _greater(comp)
- {
- for (size_t i = 0; i < iters.size(); i++) {
- if (iters[i]->more()) {
- _heap.push_back(
- std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
- }
- }
-
- if (_heap.empty()) {
- _remaining = 0;
- return;
- }
-
- std::make_heap(_heap.begin(), _heap.end(), _greater);
- std::pop_heap(_heap.begin(), _heap.end(), _greater);
- _current = _heap.back();
- _heap.pop_back();
- }
+ if (!compressed) {
+ _reader.reset(new BufReader(_buffer.get(), blockSize));
+ return;
+ }
- bool more() {
- if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
- return true;
+ dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
- // We are done so clean up resources.
- // Can't do this in next() due to lifetime guarantees of unowned Data.
- _heap.clear();
- _current.reset();
- _remaining = 0;
+ size_t uncompressedSize;
+ massert(17061,
+ "couldn't get uncompressed length",
+ snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
- return false;
+ std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
+ massert(17062,
+ "decompression failed",
+ snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get()));
+
+ // hold on to decompressed data and throw out compressed data at block exit
+ _buffer.swap(decompressionBuffer);
+ _reader.reset(new BufReader(_buffer.get(), uncompressedSize));
+ }
+
+ // sets _done to true on EOF - asserts on any other error
+ void read(void* out, size_t size) {
+ _file.read(reinterpret_cast<char*>(out), size);
+ if (!_file.good()) {
+ if (_file.eof()) {
+ _done = true;
+ return;
}
- Data next() {
- verify(_remaining);
+ msgasserted(16817,
+ str::stream() << "error reading file \"" << _fileName
+ << "\": " << myErrnoWithDescription());
+ }
+ verify(_file.gcount() == static_cast<std::streamsize>(size));
+ }
- _remaining--;
+ const Settings _settings;
+ bool _done;
+ std::unique_ptr<char[]> _buffer;
+ std::unique_ptr<BufReader> _reader;
+ std::string _fileName;
+ std::shared_ptr<FileDeleter> _fileDeleter; // Must outlive _file
+ std::ifstream _file;
+};
+
+/** Merge-sorts results from 0 or more FileIterators */
+template <typename Key, typename Value, typename Comparator>
+class MergeIterator : public SortIteratorInterface<Key, Value> {
+public:
+ typedef SortIteratorInterface<Key, Value> Input;
+ typedef std::pair<Key, Value> Data;
+
+
+ MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
+ const SortOptions& opts,
+ const Comparator& comp)
+ : _opts(opts),
+ _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
+ _first(true),
+ _greater(comp) {
+ for (size_t i = 0; i < iters.size(); i++) {
+ if (iters[i]->more()) {
+ _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
+ }
+ }
- if (_first) {
- _first = false;
- return _current->current();
- }
+ if (_heap.empty()) {
+ _remaining = 0;
+ return;
+ }
- if (!_current->advance()) {
- verify(!_heap.empty());
+ std::make_heap(_heap.begin(), _heap.end(), _greater);
+ std::pop_heap(_heap.begin(), _heap.end(), _greater);
+ _current = _heap.back();
+ _heap.pop_back();
+ }
- std::pop_heap(_heap.begin(), _heap.end(), _greater);
- _current = _heap.back();
- _heap.pop_back();
- } else if (!_heap.empty() && _greater(_current, _heap.front())) {
- std::pop_heap(_heap.begin(), _heap.end(), _greater);
- std::swap(_current, _heap.back());
- std::push_heap(_heap.begin(), _heap.end(), _greater);
- }
+ bool more() {
+ if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
+ return true;
- return _current->current();
- }
+ // We are done so clean up resources.
+ // Can't do this in next() due to lifetime guarantees of unowned Data.
+ _heap.clear();
+ _current.reset();
+ _remaining = 0;
+ return false;
+ }
- private:
- class Stream { // Data + Iterator
- public:
- Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
- : fileNum(fileNum)
- , _current(first)
- , _rest(rest)
- {}
-
- const Data& current() const { return _current; }
- bool more() { return _rest->more(); }
- bool advance() {
- if (!_rest->more())
- return false;
-
- _current = _rest->next();
- return true;
- }
-
- const size_t fileNum;
- private:
- Data _current;
- std::shared_ptr<Input> _rest;
- };
-
- class STLComparator { // uses greater rather than less-than to maintain a MinHeap
- public:
- explicit STLComparator(const Comparator& comp) : _comp(comp) {}
- bool operator () (unowned_ptr<const Stream> lhs,
- unowned_ptr<const Stream> rhs) const {
- // first compare data
- dassertCompIsSane(_comp, lhs->current(), rhs->current());
- int ret = _comp(lhs->current(), rhs->current());
- if (ret)
- return ret > 0;
-
- // then compare fileNums to ensure stability
- return lhs->fileNum > rhs->fileNum;
- }
- private:
- const Comparator _comp;
- };
-
- SortOptions _opts;
- unsigned long long _remaining;
- bool _first;
- std::shared_ptr<Stream> _current;
- std::vector<std::shared_ptr<Stream> > _heap; // MinHeap
- STLComparator _greater; // named so calls make sense
- };
-
- template <typename Key, typename Value, typename Comparator>
- class NoLimitSorter : public Sorter<Key, Value> {
- public:
- typedef std::pair<Key, Value> Data;
- typedef SortIteratorInterface<Key, Value> Iterator;
- typedef std::pair<typename Key::SorterDeserializeSettings
- ,typename Value::SorterDeserializeSettings
- > Settings;
-
- NoLimitSorter(const SortOptions& opts,
- const Comparator& comp,
- const Settings& settings = Settings())
- : _comp(comp)
- , _settings(settings)
- , _opts(opts)
- , _memUsed(0)
- { verify(_opts.limit == 0); }
-
- void add(const Key& key, const Value& val) {
- _data.push_back(std::make_pair(key, val));
-
- _memUsed += key.memUsageForSorter();
- _memUsed += val.memUsageForSorter();
-
- if (_memUsed > _opts.maxMemoryUsageBytes)
- spill();
- }
+ Data next() {
+ verify(_remaining);
- Iterator* done() {
- if (_iters.empty()) {
- sort();
- return new InMemIterator<Key, Value>(_data);
- }
+ _remaining--;
- spill();
- return Iterator::merge(_iters, _opts, _comp);
- }
+ if (_first) {
+ _first = false;
+ return _current->current();
+ }
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const { return _iters.size(); }
- size_t memUsed() const { return _memUsed; }
-
- private:
- class STLComparator {
- public:
- explicit STLComparator(const Comparator& comp) : _comp(comp) {}
- bool operator () (const Data& lhs, const Data& rhs) const {
- dassertCompIsSane(_comp, lhs, rhs);
- return _comp(lhs, rhs) < 0;
- }
- private:
- const Comparator& _comp;
- };
-
- void sort() {
- STLComparator less(_comp);
- std::stable_sort(_data.begin(), _data.end(), less);
-
- // Does 2x more compares than stable_sort
- // TODO test on windows
- //std::sort(_data.begin(), _data.end(), comp);
- }
+ if (!_current->advance()) {
+ verify(!_heap.empty());
- void spill() {
- if (_data.empty())
- return;
+ std::pop_heap(_heap.begin(), _heap.end(), _greater);
+ _current = _heap.back();
+ _heap.pop_back();
+ } else if (!_heap.empty() && _greater(_current, _heap.front())) {
+ std::pop_heap(_heap.begin(), _heap.end(), _greater);
+ std::swap(_current, _heap.back());
+ std::push_heap(_heap.begin(), _heap.end(), _greater);
+ }
- if (!_opts.extSortAllowed) {
- // XXX This error message is only correct for aggregation, but it is also the
- // only way this code could be hit at the moment. If the Sorter is used
- // elsewhere where extSortAllowed could possibly be false, this message will
- // need to be revisited.
- uasserted(16819, str::stream()
- << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
- << " bytes, but did not opt in to external sorting. Aborting operation."
- << " Pass allowDiskUse:true to opt in."
- );
- }
+ return _current->current();
+ }
- sort();
- SortedFileWriter<Key, Value> writer(_opts, _settings);
- for ( ; !_data.empty(); _data.pop_front()) {
- writer.addAlreadySorted(_data.front().first, _data.front().second);
- }
+private:
+ class Stream { // Data + Iterator
+ public:
+ Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
+ : fileNum(fileNum), _current(first), _rest(rest) {}
- _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+ const Data& current() const {
+ return _current;
+ }
+ bool more() {
+ return _rest->more();
+ }
+ bool advance() {
+ if (!_rest->more())
+ return false;
- _memUsed = 0;
- }
+ _current = _rest->next();
+ return true;
+ }
- const Comparator _comp;
- const Settings _settings;
- SortOptions _opts;
- size_t _memUsed;
- std::deque<Data> _data; // the "current" data
- std::vector<std::shared_ptr<Iterator> > _iters; // data that has already been spilled
- };
-
- template <typename Key, typename Value, typename Comparator>
- class LimitOneSorter : public Sorter<Key, Value> {
- // Since this class is only used for limit==1, it omits all logic to
- // spill to disk and only tracks memory usage if explicitly requested.
- public:
- typedef std::pair<Key, Value> Data;
- typedef SortIteratorInterface<Key, Value> Iterator;
-
- LimitOneSorter(const SortOptions& opts, const Comparator& comp)
- : _comp(comp)
- , _haveData(false)
- { verify(opts.limit == 1); }
-
- void add(const Key& key, const Value& val) {
- Data contender(key, val);
-
- if (_haveData) {
- dassertCompIsSane(_comp, _best, contender);
- if (_comp(_best, contender) <= 0)
- return; // not good enough
- } else {
- _haveData = true;
- }
-
- _best = contender;
- }
+ const size_t fileNum;
+
+ private:
+ Data _current;
+ std::shared_ptr<Input> _rest;
+ };
+
+ class STLComparator { // uses greater rather than less-than to maintain a MinHeap
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+ bool operator()(unowned_ptr<const Stream> lhs, unowned_ptr<const Stream> rhs) const {
+ // first compare data
+ dassertCompIsSane(_comp, lhs->current(), rhs->current());
+ int ret = _comp(lhs->current(), rhs->current());
+ if (ret)
+ return ret > 0;
+
+ // then compare fileNums to ensure stability
+ return lhs->fileNum > rhs->fileNum;
+ }
- Iterator* done() {
- if (_haveData) {
- return new InMemIterator<Key, Value>(_best);
- } else {
- return new InMemIterator<Key, Value>();
- }
- }
+ private:
+ const Comparator _comp;
+ };
+
+ SortOptions _opts;
+ unsigned long long _remaining;
+ bool _first;
+ std::shared_ptr<Stream> _current;
+ std::vector<std::shared_ptr<Stream>> _heap; // MinHeap
+ STLComparator _greater; // named so calls make sense
+};
+
+template <typename Key, typename Value, typename Comparator>
+class NoLimitSorter : public Sorter<Key, Value> {
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings> Settings;
+
+ NoLimitSorter(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings())
+ : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) {
+ verify(_opts.limit == 0);
+ }
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const { return 0; }
- size_t memUsed() const { return _best.first.memUsageForSorter()
- + _best.second.memUsageForSorter(); }
-
- private:
- const Comparator _comp;
- Data _best;
- bool _haveData; // false at start, set to true on first call to add()
- };
-
- template <typename Key, typename Value, typename Comparator>
- class TopKSorter : public Sorter<Key, Value> {
- public:
- typedef std::pair<Key, Value> Data;
- typedef SortIteratorInterface<Key, Value> Iterator;
- typedef std::pair<typename Key::SorterDeserializeSettings
- ,typename Value::SorterDeserializeSettings
- > Settings;
-
- TopKSorter(const SortOptions& opts,
- const Comparator& comp,
- const Settings& settings = Settings())
- : _comp(comp)
- , _settings(settings)
- , _opts(opts)
- , _memUsed(0)
- , _haveCutoff(false)
- , _worstCount(0)
- , _medianCount(0)
- {
- // This also *works* with limit==1 but LimitOneSorter should be used instead
- verify(_opts.limit > 1);
-
- // Preallocate a fixed sized vector of the required size if we
- // don't expect it to have a major impact on our memory budget.
- // This is the common case with small limits.
- if ((sizeof(Data) * opts.limit) < opts.maxMemoryUsageBytes / 10) {
- _data.reserve(opts.limit);
- }
- }
+ void add(const Key& key, const Value& val) {
+ _data.push_back(std::make_pair(key, val));
- void add(const Key& key, const Value& val) {
- STLComparator less(_comp);
- Data contender(key, val);
+ _memUsed += key.memUsageForSorter();
+ _memUsed += val.memUsageForSorter();
- if (_data.size() < _opts.limit) {
- if (_haveCutoff && !less(contender, _cutoff))
- return;
+ if (_memUsed > _opts.maxMemoryUsageBytes)
+ spill();
+ }
- _data.push_back(contender);
+ Iterator* done() {
+ if (_iters.empty()) {
+ sort();
+ return new InMemIterator<Key, Value>(_data);
+ }
- _memUsed += key.memUsageForSorter();
- _memUsed += val.memUsageForSorter();
+ spill();
+ return Iterator::merge(_iters, _opts, _comp);
+ }
- if (_data.size() == _opts.limit)
- std::make_heap(_data.begin(), _data.end(), less);
+ // TEMP these are here for compatibility. Will be replaced with a general stats API
+ int numFiles() const {
+ return _iters.size();
+ }
+ size_t memUsed() const {
+ return _memUsed;
+ }
- if (_memUsed > _opts.maxMemoryUsageBytes)
- spill();
+private:
+ class STLComparator {
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+ bool operator()(const Data& lhs, const Data& rhs) const {
+ dassertCompIsSane(_comp, lhs, rhs);
+ return _comp(lhs, rhs) < 0;
+ }
- return;
- }
+ private:
+ const Comparator& _comp;
+ };
- verify(_data.size() == _opts.limit);
+ void sort() {
+ STLComparator less(_comp);
+ std::stable_sort(_data.begin(), _data.end(), less);
- if (!less(contender, _data.front()))
- return; // not good enough
+ // Does 2x more compares than stable_sort
+ // TODO test on windows
+ // std::sort(_data.begin(), _data.end(), comp);
+ }
- // Remove the old worst pair and insert the contender, adjusting _memUsed
+ void spill() {
+ if (_data.empty())
+ return;
- _memUsed += key.memUsageForSorter();
- _memUsed += val.memUsageForSorter();
+ if (!_opts.extSortAllowed) {
+ // XXX This error message is only correct for aggregation, but it is also the
+ // only way this code could be hit at the moment. If the Sorter is used
+ // elsewhere where extSortAllowed could possibly be false, this message will
+ // need to be revisited.
+ uasserted(16819,
+ str::stream()
+ << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
+ << " bytes, but did not opt in to external sorting. Aborting operation."
+ << " Pass allowDiskUse:true to opt in.");
+ }
- _memUsed -= _data.front().first.memUsageForSorter();
- _memUsed -= _data.front().second.memUsageForSorter();
+ sort();
- std::pop_heap(_data.begin(), _data.end(), less);
- _data.back() = contender;
- std::push_heap(_data.begin(), _data.end(), less);
+ SortedFileWriter<Key, Value> writer(_opts, _settings);
+ for (; !_data.empty(); _data.pop_front()) {
+ writer.addAlreadySorted(_data.front().first, _data.front().second);
+ }
- if (_memUsed > _opts.maxMemoryUsageBytes)
- spill();
- }
+ _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
- Iterator* done() {
- if (_iters.empty()) {
- sort();
- return new InMemIterator<Key, Value>(_data);
- }
+ _memUsed = 0;
+ }
- spill();
- return Iterator::merge(_iters, _opts, _comp);
- }
+ const Comparator _comp;
+ const Settings _settings;
+ SortOptions _opts;
+ size_t _memUsed;
+ std::deque<Data> _data; // the "current" data
+ std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
+};
+
+template <typename Key, typename Value, typename Comparator>
+class LimitOneSorter : public Sorter<Key, Value> {
+ // Since this class is only used for limit==1, it omits all logic to
+ // spill to disk and only tracks memory usage if explicitly requested.
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+
+ LimitOneSorter(const SortOptions& opts, const Comparator& comp)
+ : _comp(comp), _haveData(false) {
+ verify(opts.limit == 1);
+ }
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const { return _iters.size(); }
- size_t memUsed() const { return _memUsed; }
-
- private:
- class STLComparator {
- public:
- explicit STLComparator(const Comparator& comp) : _comp(comp) {}
- bool operator () (const Data& lhs, const Data& rhs) const {
- dassertCompIsSane(_comp, lhs, rhs);
- return _comp(lhs, rhs) < 0;
- }
- private:
- const Comparator& _comp;
- };
-
- void sort() {
- STLComparator less(_comp);
-
- if (_data.size() == _opts.limit) {
- std::sort_heap(_data.begin(), _data.end(), less);
- } else {
- std::stable_sort(_data.begin(), _data.end(), less);
- }
- }
+ void add(const Key& key, const Value& val) {
+ Data contender(key, val);
- // Can only be called after _data is sorted
- void updateCutoff() {
- // Theory of operation: We want to be able to eagerly ignore values we know will not
- // be in the TopK result set by setting _cutoff to a value we know we have at least
- // K values equal to or better than. There are two values that we track to
- // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When
- // one of these values becomes the new _cutoff, its associated counter is reset to 0
- // and a new value is chosen for that member the next time we spill.
- //
- // _worstSeen is the worst value we've seen so that all kept values are better than
- // (or equal to) it. This means that once _worstCount >= _opts.limit there is no
- // reason to consider values worse than _worstSeen so it can become the new _cutoff.
- // This technique is especially useful when the input is already roughly sorted (eg
- // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff
- // that will exclude most later values, making the full TopK operation including
- // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time.
- //
- // _lastMedian was the median of the _data in the first spill() either overall or
- // following a promotion of _lastMedian to _cutoff. We count the number of kept
- // values that are better than or equal to _lastMedian in _medianCount and can
- // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming
- // reasonable median selection (which should happen when the data is completely
- // unsorted), after the first K spilled values, we will keep roughly 50% of the
- // incoming values, 25% after the second K, 12.5% after the third K, etc. This means
- // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values,
- // so the expected number of kept values is O(Log(N/K) * K). The final run time if
- // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) +
- // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)).
- //
- // This leaves a currently unoptimized worst case of data that is already roughly
- // sorted, but in the wrong direction, such that the desired results are all the
- // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this
- // should be trivially detectable, as a future optimization it might be nice to
- // detect this case and reverse the direction of input (if possible) which would
- // turn this into the best case described above.
- //
- // Pedantic notes: The time complexities above (which count number of comparisons)
- // ignore the sorting of batches prior to spilling to disk since they make it more
- // confusing without changing the results. If you want to add them back in, add an
- // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also,
- // all space complexities measure disk space rather than memory since this class is
- // O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
-
- STLComparator less(_comp); // less is "better" for TopK.
-
- // Pick a new _worstSeen or _lastMedian if should.
- if (_worstCount == 0 || less(_worstSeen, _data.back())) {
- _worstSeen = _data.back();
- }
- if (_medianCount == 0) {
- size_t medianIndex = _data.size() / 2; // chooses the higher if size() is even.
- _lastMedian = _data[medianIndex];
- }
-
- // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian.
- _worstCount += _data.size(); // everything is better or equal
- typename std::vector<Data>::iterator firstWorseThanLastMedian =
- std::upper_bound(_data.begin(), _data.end(), _lastMedian, less);
- _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian);
-
-
- // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
- if (_worstCount >= _opts.limit) {
- if (!_haveCutoff || less(_worstSeen, _cutoff)) {
- _cutoff = _worstSeen;
- _haveCutoff = true;
- }
- _worstCount = 0;
- }
- if (_medianCount >= _opts.limit) {
- if (!_haveCutoff || less(_lastMedian, _cutoff)) {
- _cutoff = _lastMedian;
- _haveCutoff = true;
- }
- _medianCount = 0;
- }
+ if (_haveData) {
+ dassertCompIsSane(_comp, _best, contender);
+ if (_comp(_best, contender) <= 0)
+ return; // not good enough
+ } else {
+ _haveData = true;
+ }
- }
+ _best = contender;
+ }
+
+ Iterator* done() {
+ if (_haveData) {
+ return new InMemIterator<Key, Value>(_best);
+ } else {
+ return new InMemIterator<Key, Value>();
+ }
+ }
- void spill() {
- if (_data.empty())
- return;
+ // TEMP these are here for compatibility. Will be replaced with a general stats API
+ int numFiles() const {
+ return 0;
+ }
+ size_t memUsed() const {
+ return _best.first.memUsageForSorter() + _best.second.memUsageForSorter();
+ }
- if (!_opts.extSortAllowed) {
- // XXX This error message is only correct for aggregation, but it is also the
- // only way this code could be hit at the moment. If the Sorter is used
- // elsewhere where extSortAllowed could possibly be false, this message will
- // need to be revisited.
- uasserted(16820, str::stream()
- << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
- << " bytes, but did not opt in to external sorting. Aborting operation."
- << " Pass allowDiskUse:true to opt in."
- );
- }
+private:
+ const Comparator _comp;
+ Data _best;
+ bool _haveData; // false at start, set to true on first call to add()
+};
+
+template <typename Key, typename Value, typename Comparator>
+class TopKSorter : public Sorter<Key, Value> {
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings> Settings;
+
+ TopKSorter(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings())
+ : _comp(comp),
+ _settings(settings),
+ _opts(opts),
+ _memUsed(0),
+ _haveCutoff(false),
+ _worstCount(0),
+ _medianCount(0) {
+ // This also *works* with limit==1 but LimitOneSorter should be used instead
+ verify(_opts.limit > 1);
+
+ // Preallocate a fixed sized vector of the required size if we
+ // don't expect it to have a major impact on our memory budget.
+ // This is the common case with small limits.
+ if ((sizeof(Data) * opts.limit) < opts.maxMemoryUsageBytes / 10) {
+ _data.reserve(opts.limit);
+ }
+ }
- sort();
- updateCutoff();
+ void add(const Key& key, const Value& val) {
+ STLComparator less(_comp);
+ Data contender(key, val);
- SortedFileWriter<Key, Value> writer(_opts, _settings);
- for (size_t i=0; i<_data.size(); i++) {
- writer.addAlreadySorted(_data[i].first, _data[i].second);
- }
+ if (_data.size() < _opts.limit) {
+ if (_haveCutoff && !less(contender, _cutoff))
+ return;
- // clear _data and release backing array's memory
- std::vector<Data>().swap(_data);
+ _data.push_back(contender);
- _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+ _memUsed += key.memUsageForSorter();
+ _memUsed += val.memUsageForSorter();
- _memUsed = 0;
- }
+ if (_data.size() == _opts.limit)
+ std::make_heap(_data.begin(), _data.end(), less);
+
+ if (_memUsed > _opts.maxMemoryUsageBytes)
+ spill();
- const Comparator _comp;
- const Settings _settings;
- SortOptions _opts;
- size_t _memUsed;
- std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit.
- std::vector<std::shared_ptr<Iterator> > _iters; // data that has already been spilled
-
- // See updateCutoff() for a full description of how these members are used.
- bool _haveCutoff;
- Data _cutoff; // We can definitely ignore values worse than this.
- Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit.
- size_t _worstCount; // Number of docs better or equal to _worstSeen kept so far.
- Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit.
- size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far.
- };
-
- inline unsigned nextFileNumber() {
- // This is unified across all Sorter types and instances.
- static AtomicUInt32 fileCounter;
- return fileCounter.fetchAndAdd(1);
+ return;
}
- } // namespace sorter
- //
- // SortedFileWriter
- //
+ verify(_data.size() == _opts.limit);
+ if (!less(contender, _data.front()))
+ return; // not good enough
- template <typename Key, typename Value>
- SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
- const Settings& settings)
- : _settings(settings)
- {
- namespace str = mongoutils::str;
+ // Remove the old worst pair and insert the contender, adjusting _memUsed
- // This should be checked by consumers, but if we get here don't allow writes.
- massert(16946, "Attempting to use external sort from mongos. This is not allowed.",
- !isMongos());
+ _memUsed += key.memUsageForSorter();
+ _memUsed += val.memUsageForSorter();
- massert(17148, "Attempting to use external sort without setting SortOptions::tempDir",
- !opts.tempDir.empty());
+ _memUsed -= _data.front().first.memUsageForSorter();
+ _memUsed -= _data.front().second.memUsageForSorter();
- {
- StringBuilder sb;
- sb << opts.tempDir << "/extsort." << sorter::nextFileNumber();
- _fileName = sb.str();
- }
+ std::pop_heap(_data.begin(), _data.end(), less);
+ _data.back() = contender;
+ std::push_heap(_data.begin(), _data.end(), less);
- boost::filesystem::create_directories(opts.tempDir);
+ if (_memUsed > _opts.maxMemoryUsageBytes)
+ spill();
+ }
- _file.open(_fileName.c_str(), std::ios::binary | std::ios::out);
- massert(16818, str::stream() << "error opening file \"" << _fileName << "\": "
- << sorter::myErrnoWithDescription(),
- _file.good());
+ Iterator* done() {
+ if (_iters.empty()) {
+ sort();
+ return new InMemIterator<Key, Value>(_data);
+ }
- _fileDeleter = std::make_shared<sorter::FileDeleter>(_fileName);
+ spill();
+ return Iterator::merge(_iters, _opts, _comp);
+ }
- // throw on failure
- _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
+ // TEMP these are here for compatibility. Will be replaced with a general stats API
+ int numFiles() const {
+ return _iters.size();
+ }
+ size_t memUsed() const {
+ return _memUsed;
}
- template <typename Key, typename Value>
- void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) {
- key.serializeForSorter(_buffer);
- val.serializeForSorter(_buffer);
+private:
+ class STLComparator {
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+ bool operator()(const Data& lhs, const Data& rhs) const {
+ dassertCompIsSane(_comp, lhs, rhs);
+ return _comp(lhs, rhs) < 0;
+ }
- if (_buffer.len() > 64*1024)
- spill();
+ private:
+ const Comparator& _comp;
+ };
+
+ void sort() {
+ STLComparator less(_comp);
+
+ if (_data.size() == _opts.limit) {
+ std::sort_heap(_data.begin(), _data.end(), less);
+ } else {
+ std::stable_sort(_data.begin(), _data.end(), less);
+ }
}
- template <typename Key, typename Value>
- void SortedFileWriter<Key, Value>::spill() {
- namespace str = mongoutils::str;
+ // Can only be called after _data is sorted
+ void updateCutoff() {
+ // Theory of operation: We want to be able to eagerly ignore values we know will not
+ // be in the TopK result set by setting _cutoff to a value we know we have at least
+ // K values equal to or better than. There are two values that we track to
+ // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When
+ // one of these values becomes the new _cutoff, its associated counter is reset to 0
+ // and a new value is chosen for that member the next time we spill.
+ //
+ // _worstSeen is the worst value we've seen so that all kept values are better than
+ // (or equal to) it. This means that once _worstCount >= _opts.limit there is no
+ // reason to consider values worse than _worstSeen so it can become the new _cutoff.
+ // This technique is especially useful when the input is already roughly sorted (eg
+ // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff
+ // that will exclude most later values, making the full TopK operation including
+ // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time.
+ //
+ // _lastMedian was the median of the _data in the first spill() either overall or
+ // following a promotion of _lastMedian to _cutoff. We count the number of kept
+ // values that are better than or equal to _lastMedian in _medianCount and can
+ // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming
+ // reasonable median selection (which should happen when the data is completely
+ // unsorted), after the first K spilled values, we will keep roughly 50% of the
+ // incoming values, 25% after the second K, 12.5% after the third K, etc. This means
+ // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values,
+ // so the expected number of kept values is O(Log(N/K) * K). The final run time if
+ // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) +
+ // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)).
+ //
+ // This leaves a currently unoptimized worst case of data that is already roughly
+ // sorted, but in the wrong direction, such that the desired results are all the
+ // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this
+ // should be trivially detectable, as a future optimization it might be nice to
+ // detect this case and reverse the direction of input (if possible) which would
+ // turn this into the best case described above.
+ //
+ // Pedantic notes: The time complexities above (which count number of comparisons)
+ // ignore the sorting of batches prior to spilling to disk since they make it more
+ // confusing without changing the results. If you want to add them back in, add an
+ // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also,
+ // all space complexities measure disk space rather than memory since this class is
+ // O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
+
+ STLComparator less(_comp); // less is "better" for TopK.
+
+ // Pick a new _worstSeen or _lastMedian if should.
+ if (_worstCount == 0 || less(_worstSeen, _data.back())) {
+ _worstSeen = _data.back();
+ }
+ if (_medianCount == 0) {
+ size_t medianIndex = _data.size() / 2; // chooses the higher if size() is even.
+ _lastMedian = _data[medianIndex];
+ }
- if (_buffer.len() == 0)
- return;
+ // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian.
+ _worstCount += _data.size(); // everything is better or equal
+ typename std::vector<Data>::iterator firstWorseThanLastMedian =
+ std::upper_bound(_data.begin(), _data.end(), _lastMedian, less);
+ _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian);
- std::string compressed;
- snappy::Compress(_buffer.buf(), _buffer.len(), &compressed);
- verify(compressed.size() <= size_t(std::numeric_limits<int32_t>::max()));
-
- try {
- if (compressed.size() < size_t(_buffer.len()/10*9)) {
- const int32_t size = -int32_t(compressed.size()); // negative means compressed
- _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
- _file.write(compressed.data(), compressed.size());
- } else {
- const int32_t size = _buffer.len();
- _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
- _file.write(_buffer.buf(), _buffer.len());
+
+ // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
+ if (_worstCount >= _opts.limit) {
+ if (!_haveCutoff || less(_worstSeen, _cutoff)) {
+ _cutoff = _worstSeen;
+ _haveCutoff = true;
}
- } catch (const std::exception&) {
- msgasserted(16821, str::stream() << "error writing to file \"" << _fileName << "\": "
- << sorter::myErrnoWithDescription());
+ _worstCount = 0;
+ }
+ if (_medianCount >= _opts.limit) {
+ if (!_haveCutoff || less(_lastMedian, _cutoff)) {
+ _cutoff = _lastMedian;
+ _haveCutoff = true;
+ }
+ _medianCount = 0;
}
-
- _buffer.reset();
}
- template <typename Key, typename Value>
- SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
- spill();
- _file.close();
- return new sorter::FileIterator<Key, Value>(_fileName, _settings, _fileDeleter);
+ void spill() {
+ if (_data.empty())
+ return;
+
+ if (!_opts.extSortAllowed) {
+ // XXX This error message is only correct for aggregation, but it is also the
+ // only way this code could be hit at the moment. If the Sorter is used
+ // elsewhere where extSortAllowed could possibly be false, this message will
+ // need to be revisited.
+ uasserted(16820,
+ str::stream()
+ << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
+ << " bytes, but did not opt in to external sorting. Aborting operation."
+ << " Pass allowDiskUse:true to opt in.");
+ }
+
+ sort();
+ updateCutoff();
+
+ SortedFileWriter<Key, Value> writer(_opts, _settings);
+ for (size_t i = 0; i < _data.size(); i++) {
+ writer.addAlreadySorted(_data[i].first, _data[i].second);
+ }
+
+ // clear _data and release backing array's memory
+ std::vector<Data>().swap(_data);
+
+ _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+
+ _memUsed = 0;
}
- //
- // Factory Functions
- //
+ const Comparator _comp;
+ const Settings _settings;
+ SortOptions _opts;
+ size_t _memUsed;
+ std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit.
+ std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
+
+ // See updateCutoff() for a full description of how these members are used.
+ bool _haveCutoff;
+ Data _cutoff; // We can definitely ignore values worse than this.
+ Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit.
+ size_t _worstCount; // Number of docs better or equal to _worstSeen kept so far.
+ Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit.
+ size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far.
+};
+
+inline unsigned nextFileNumber() {
+ // This is unified across all Sorter types and instances.
+ static AtomicUInt32 fileCounter;
+ return fileCounter.fetchAndAdd(1);
+}
+} // namespace sorter
+
+//
+// SortedFileWriter
+//
+
+
+template <typename Key, typename Value>
+SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, const Settings& settings)
+ : _settings(settings) {
+ namespace str = mongoutils::str;
- template <typename Key, typename Value>
- template <typename Comparator>
- SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
- const std::vector<std::shared_ptr<SortIteratorInterface> >& iters,
- const SortOptions& opts,
- const Comparator& comp) {
- return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp);
+ // This should be checked by consumers, but if we get here don't allow writes.
+ massert(
+ 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
+
+ massert(17148,
+ "Attempting to use external sort without setting SortOptions::tempDir",
+ !opts.tempDir.empty());
+
+ {
+ StringBuilder sb;
+ sb << opts.tempDir << "/extsort." << sorter::nextFileNumber();
+ _fileName = sb.str();
}
- template <typename Key, typename Value>
- template <typename Comparator>
- Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
- const Comparator& comp,
- const Settings& settings) {
+ boost::filesystem::create_directories(opts.tempDir);
- // This should be checked by consumers, but if it isn't try to fail early.
- massert(16947, "Attempting to use external sort from mongos. This is not allowed.",
- !(isMongos() && opts.extSortAllowed));
+ _file.open(_fileName.c_str(), std::ios::binary | std::ios::out);
+ massert(16818,
+ str::stream() << "error opening file \"" << _fileName
+ << "\": " << sorter::myErrnoWithDescription(),
+ _file.good());
- massert(17149, "Attempting to use external sort without setting SortOptions::tempDir",
- !(opts.extSortAllowed && opts.tempDir.empty()));
+ _fileDeleter = std::make_shared<sorter::FileDeleter>(_fileName);
+
+ // throw on failure
+ _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
+}
+
+template <typename Key, typename Value>
+void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) {
+ key.serializeForSorter(_buffer);
+ val.serializeForSorter(_buffer);
+
+ if (_buffer.len() > 64 * 1024)
+ spill();
+}
- switch (opts.limit) {
- case 0: return new sorter::NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
- case 1: return new sorter::LimitOneSorter<Key, Value, Comparator>(opts, comp);
- default: return new sorter::TopKSorter<Key, Value, Comparator>(opts, comp, settings);
+template <typename Key, typename Value>
+void SortedFileWriter<Key, Value>::spill() {
+ namespace str = mongoutils::str;
+
+ if (_buffer.len() == 0)
+ return;
+
+ std::string compressed;
+ snappy::Compress(_buffer.buf(), _buffer.len(), &compressed);
+ verify(compressed.size() <= size_t(std::numeric_limits<int32_t>::max()));
+
+ try {
+ if (compressed.size() < size_t(_buffer.len() / 10 * 9)) {
+ const int32_t size = -int32_t(compressed.size()); // negative means compressed
+ _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
+ _file.write(compressed.data(), compressed.size());
+ } else {
+ const int32_t size = _buffer.len();
+ _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
+ _file.write(_buffer.buf(), _buffer.len());
}
+ } catch (const std::exception&) {
+ msgasserted(16821,
+ str::stream() << "error writing to file \"" << _fileName
+ << "\": " << sorter::myErrnoWithDescription());
}
+
+ _buffer.reset();
+}
+
+template <typename Key, typename Value>
+SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
+ spill();
+ _file.close();
+ return new sorter::FileIterator<Key, Value>(_fileName, _settings, _fileDeleter);
+}
+
+//
+// Factory Functions
+//
+
+template <typename Key, typename Value>
+template <typename Comparator>
+SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
+ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const SortOptions& opts,
+ const Comparator& comp) {
+ return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp);
+}
+
+template <typename Key, typename Value>
+template <typename Comparator>
+Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings) {
+ // This should be checked by consumers, but if it isn't try to fail early.
+ massert(16947,
+ "Attempting to use external sort from mongos. This is not allowed.",
+ !(isMongos() && opts.extSortAllowed));
+
+ massert(17149,
+ "Attempting to use external sort without setting SortOptions::tempDir",
+ !(opts.extSortAllowed && opts.tempDir.empty()));
+
+ switch (opts.limit) {
+ case 0:
+ return new sorter::NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
+ case 1:
+ return new sorter::LimitOneSorter<Key, Value, Comparator>(opts, comp);
+ default:
+ return new sorter::TopKSorter<Key, Value, Comparator>(opts, comp, settings);
+ }
+}
}