diff options
author | Mathias Stearn <mathias@10gen.com> | 2013-07-01 18:15:42 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2013-07-10 17:17:58 -0400 |
commit | 0e573d73f6bf7deea31e3f2c6076fe21a07effe0 (patch) | |
tree | a01cef281b9d892faa0dea42a36679feb7a24630 /src/mongo/db/pipeline | |
parent | 2e2a6fdffdba369a0594962267e5bc7bb47a3f3a (diff) | |
download | mongo-0e573d73f6bf7deea31e3f2c6076fe21a07effe0.tar.gz |
SERVER-9444 Use Sorter in DocumentSourceSort
For now external sorting is disabled, but this lays the groundwork to
enable it.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 56 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 135 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.h | 7 |
6 files changed, 107 insertions, 121 deletions
diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp index bdd5b17c7e2..451293482d2 100644 --- a/src/mongo/db/pipeline/document.cpp +++ b/src/mongo/db/pipeline/document.cpp @@ -365,4 +365,16 @@ namespace mongo { return out.str(); } + + // TODO make these functions better + void Document::serializeForSorter(BufBuilder& buf) const { + BSONObjBuilder bb(buf); + toBson(&bb); + bb.doneFast(); + } + + Document Document::deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) { + BSONObj bson = BSONObj::deserializeForSorter(buf, BSONObj::SorterDeserializeSettings()); + return Document(bson); + } } diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h index 77c04bafb3f..c50466089a4 100644 --- a/src/mongo/db/pipeline/document.h +++ b/src/mongo/db/pipeline/document.h @@ -142,6 +142,13 @@ namespace mongo { */ Document clone() const { return Document(storage().clone().get()); } + /// members for Sorter + struct SorterDeserializeSettings {}; // unused + void serializeForSorter(BufBuilder& buf) const; + static Document deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&); + int memUsageForSorter() const { return getApproximateSize(); } + Document getOwned() const { return *this; } + // TEMP for compatibility with legacy intrusive_ptr<Document> Document& operator*() { return *this; } const Document& operator*() const { return *this; } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 358b65df438..2103fe80d06 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -19,17 +19,19 @@ #include "mongo/pch.h" #include <boost/unordered_map.hpp> -#include "util/intrusive_counter.h" -#include "db/clientcursor.h" -#include "db/jsobj.h" -#include "db/matcher.h" -#include "db/pipeline/document.h" -#include "db/pipeline/expression.h" + +#include "mongo/db/clientcursor.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/matcher.h" +#include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression_context.h" -#include "db/pipeline/value.h" -#include "util/string_writer.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/value.h" #include "mongo/db/projection.h" +#include "mongo/db/sorter/sorter.h" #include "mongo/s/shard.h" +#include "mongo/util/intrusive_counter.h" +#include "mongo/util/string_writer.h" namespace mongo { class Accumulator; @@ -938,48 +940,36 @@ namespace mongo { void populate(); bool populated; - // These are called by populate() - void populateAll(); // no limit - void populateOne(); // limit == 1 - void populateTopK(); // limit > 1 - /* these two parallel each other */ typedef vector<intrusive_ptr<ExpressionFieldPath> > SortPaths; SortPaths vSortKey; vector<char> vAscending; // used like vector<bool> but without specialization - struct KeyAndDoc { - explicit KeyAndDoc(const Document& d, const SortPaths& sp); // extracts sort key - Value key; // array of keys if vSortKey.size() > 1 - Document doc; - }; - friend void swap(KeyAndDoc& l, KeyAndDoc& r); + /// Extracts the fields in vSortKey from the Document; + Value extractKey(const Document& d) const; - /// Compare two KeyAndDocs according to the specified sort key. - int compare(const KeyAndDoc& lhs, const KeyAndDoc& rhs) const; + /// Compare two Values according to the specified sort key. + int compare(const Value& lhs, const Value& rhs) const; - /* - This is a utility class just for the STL sort that is done - inside. - */ + typedef Sorter<Value, Document> MySorter; + + // For MySorter class Comparator { public: explicit Comparator(const DocumentSourceSort& source): _source(source) {} - bool operator()(const KeyAndDoc& lhs, const KeyAndDoc& rhs) const { - return (_source.compare(lhs, rhs) < 0); + int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const { + return _source.compare(lhs.first, rhs.first); } private: const DocumentSourceSort& _source; }; - deque<KeyAndDoc> documents; - intrusive_ptr<DocumentSourceLimit> limitSrc; + + bool _done; + Document _current; + scoped_ptr<MySorter::Iterator> _output; }; - inline void swap(DocumentSourceSort::KeyAndDoc& l, DocumentSourceSort::KeyAndDoc& r) { - l.key.swap(r.key); - l.doc.swap(r.doc); - } class DocumentSourceLimit : public SplittableDocumentSource { diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index da06157cb0f..8822f5fd073 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -39,7 +39,7 @@ namespace mongo { if (!populated) populate(); - return documents.empty(); + return _done; } bool DocumentSourceSort::advance() { @@ -48,15 +48,21 @@ namespace mongo { if (!populated) populate(); - if (!documents.empty()) - documents.pop_front(); // this way we release memory as we go + verify(_output); - return !documents.empty(); + if (_output->more()) { + _current = _output->next().second; + } + else { + _done = true; + } + + return !_done; } Document DocumentSourceSort::getCurrent() { - verify(!documents.empty()); - return documents.front().doc; + verify(populated && !_done); + return _current; } void DocumentSourceSort::addToBsonArray(BSONArrayBuilder *pBuilder, bool explain) const { @@ -89,13 +95,16 @@ namespace mongo { } void DocumentSourceSort::dispose() { - documents.clear(); + _current = Document(); + _output.reset(); + _done = true; pSource->dispose(); } DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext> &pExpCtx) : SplittableDocumentSource(pExpCtx) , populated(false) + , _done(false) {} long long DocumentSourceSort::getLimit() const { @@ -195,101 +204,48 @@ namespace mongo { /* make sure we've got a sort key */ verify(vSortKey.size()); - if (!limitSrc) - populateAll(); - else if (limitSrc->getLimit() == 1) - populateOne(); - else - populateTopK(); - - populated = true; - } - - void DocumentSourceSort::populateAll() { - /* track and warn about how much physical memory has been used */ - DocMemMonitor dmm(this); - - /* pull everything from the underlying source */ - for (bool hasNext = !pSource->eof(); hasNext; hasNext = pSource->advance()) { - documents.push_back(KeyAndDoc(pSource->getCurrent(), vSortKey)); - dmm.addToTotal(documents.back().doc.getApproximateSize()); - } - - /* sort the list */ - Comparator comparator(*this); - sort(documents.begin(), documents.end(), comparator); - } - - void DocumentSourceSort::populateOne() { - if (pSource->eof()) + if (pSource->eof()) { + _done = true; return; - - KeyAndDoc best (pSource->getCurrent(), vSortKey); - while (pSource->advance()) { - KeyAndDoc next (pSource->getCurrent(), vSortKey); - if (compare(next, best) < 0) { - // we have a new best - swap(best, next); - } } - documents.push_back(best); - } - - void DocumentSourceSort::populateTopK() { - bool hasNext = !pSource->eof(); - - size_t limit = limitSrc->getLimit(); - - // Pull first K documents unconditionally - vector<KeyAndDoc> heap; - heap.reserve(limit); - for (; hasNext && heap.size() < limit; hasNext = pSource->advance()) { - heap.push_back(KeyAndDoc(pSource->getCurrent(), vSortKey)); - } + SortOptions opts; + if (limitSrc) + opts.limit = limitSrc->getLimit(); - // We now maintain a MaxHeap of K items. This means that the least-best - // document is at the top of the heap (heap.front()). If a new - // document is better than the top of the heap, we pop the top and add - // the new document to the heap. + // TODO make these tunable + opts.maxMemoryUsageBytes = 100*1024*1024; + opts.extSortAllowed = false; - Comparator comp (*this); + scoped_ptr<MySorter> sorter (MySorter::make(opts, Comparator(*this))); - // after this, heap.front() is least-best document - std::make_heap(heap.begin(), heap.end(), comp); + do { // pSource->eof() checked above + const Document doc = pSource->getCurrent(); + sorter->add(extractKey(doc), doc); + } while (pSource->advance()); - for (; hasNext; hasNext = pSource->advance()) { - KeyAndDoc next (pSource->getCurrent(), vSortKey); - if (compare(next, heap.front()) < 0) { - // remove least-best from heap - std::pop_heap(heap.begin(), heap.end(), comp); + _output.reset(sorter->done()); + verify(_output->more()); // we put something in so we should get something out + _current = _output->next().second; - // add next to heap - swap(heap.back(), next); - std::push_heap(heap.begin(), heap.end(), comp); - } - } - - std::sort_heap(heap.begin(), heap.end(), comp); - documents.insert(documents.begin(), heap.begin(), heap.end()); + populated = true; } - DocumentSourceSort::KeyAndDoc::KeyAndDoc(const Document& d, const SortPaths& sp) :doc(d) { - if (sp.size() == 1) { - key = sp[0]->evaluate(d); - return; + Value DocumentSourceSort::extractKey(const Document& d) const { + if (vSortKey.size() == 1) { + return vSortKey[0]->evaluate(d); } const Variables vars(d); vector<Value> keys; - keys.reserve(sp.size()); - for (size_t i=0; i < sp.size(); i++) { - keys.push_back(sp[i]->evaluate(vars)); + keys.reserve(vSortKey.size()); + for (size_t i=0; i < vSortKey.size(); i++) { + keys.push_back(vSortKey[i]->evaluate(vars)); } - key = Value::consume(keys); + return Value::consume(keys); } - int DocumentSourceSort::compare(const KeyAndDoc & lhs, const KeyAndDoc & rhs) const { + int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { /* populate() already checked that there is a non-empty sort key, @@ -301,14 +257,14 @@ namespace mongo { const size_t n = vSortKey.size(); if (n == 1) { // simple fast case if (vAscending[0]) - return Value::compare(lhs.key, rhs.key); + return Value::compare(lhs, rhs); else - return -Value::compare(lhs.key, rhs.key); + return -Value::compare(lhs, rhs); } // compound sort for (size_t i = 0; i < n; i++) { - int cmp = Value::compare(lhs.key[i], rhs.key[i]); + int cmp = Value::compare(lhs[i], rhs[i]); if (cmp) { /* if necessary, adjust the return value by the key ordering */ if (!vAscending[i]) @@ -325,3 +281,6 @@ namespace mongo { return 0; } } + +#include "db/sorter/sorter.cpp" +// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp index 664ffb990dc..e3153796b92 100644 --- a/src/mongo/db/pipeline/value.cpp +++ b/src/mongo/db/pipeline/value.cpp @@ -916,4 +916,15 @@ namespace mongo { // Not in default case to trigger better warning if a case is missing verify(false); } + + // TODO make these functions better + void Value::serializeForSorter(BufBuilder& buf) const { + BSONObjBuilder bb(buf); + addToBsonObj(&bb, ""); + bb.doneFast(); + } + Value Value::deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) { + BSONObj bson = BSONObj::deserializeForSorter(buf, BSONObj::SorterDeserializeSettings()); + return Value(bson.firstElement()); + } } diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h index 86cb7fa570c..def84abefa6 100644 --- a/src/mongo/db/pipeline/value.h +++ b/src/mongo/db/pipeline/value.h @@ -234,6 +234,13 @@ namespace mongo { /// Call this after memcpying to update ref counts if needed void memcpyed() const { _storage.memcpyed(); } + /// members for Sorter + struct SorterDeserializeSettings {}; // unused + void serializeForSorter(BufBuilder& buf) const; + static Value deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&); + int memUsageForSorter() const { return getApproximateSize(); } + Value getOwned() const { return *this; } + private: /** This is a "honeypot" to prevent unexpected implicit conversions to the accepted argument * types. bool is especially bad since without this it will accept any pointer. |