summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2013-07-01 18:15:42 -0400
committerMathias Stearn <mathias@10gen.com>2013-07-10 17:17:58 -0400
commit0e573d73f6bf7deea31e3f2c6076fe21a07effe0 (patch)
treea01cef281b9d892faa0dea42a36679feb7a24630 /src/mongo/db/pipeline
parent2e2a6fdffdba369a0594962267e5bc7bb47a3f3a (diff)
downloadmongo-0e573d73f6bf7deea31e3f2c6076fe21a07effe0.tar.gz
SERVER-9444 Use Sorter in DocumentSourceSort
For now external sorting is disabled, but this lays the groundwork to enable it.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document.cpp12
-rw-r--r--src/mongo/db/pipeline/document.h7
-rw-r--r--src/mongo/db/pipeline/document_source.h56
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp135
-rw-r--r--src/mongo/db/pipeline/value.cpp11
-rw-r--r--src/mongo/db/pipeline/value.h7
6 files changed, 107 insertions, 121 deletions
diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp
index bdd5b17c7e2..451293482d2 100644
--- a/src/mongo/db/pipeline/document.cpp
+++ b/src/mongo/db/pipeline/document.cpp
@@ -365,4 +365,16 @@ namespace mongo {
return out.str();
}
+
+ // TODO make these functions better
+ void Document::serializeForSorter(BufBuilder& buf) const {
+ BSONObjBuilder bb(buf);
+ toBson(&bb);
+ bb.doneFast();
+ }
+
+ Document Document::deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
+ BSONObj bson = BSONObj::deserializeForSorter(buf, BSONObj::SorterDeserializeSettings());
+ return Document(bson);
+ }
}
diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h
index 77c04bafb3f..c50466089a4 100644
--- a/src/mongo/db/pipeline/document.h
+++ b/src/mongo/db/pipeline/document.h
@@ -142,6 +142,13 @@ namespace mongo {
*/
Document clone() const { return Document(storage().clone().get()); }
+ /// members for Sorter
+ struct SorterDeserializeSettings {}; // unused
+ void serializeForSorter(BufBuilder& buf) const;
+ static Document deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&);
+ int memUsageForSorter() const { return getApproximateSize(); }
+ Document getOwned() const { return *this; }
+
// TEMP for compatibility with legacy intrusive_ptr<Document>
Document& operator*() { return *this; }
const Document& operator*() const { return *this; }
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 358b65df438..2103fe80d06 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -19,17 +19,19 @@
#include "mongo/pch.h"
#include <boost/unordered_map.hpp>
-#include "util/intrusive_counter.h"
-#include "db/clientcursor.h"
-#include "db/jsobj.h"
-#include "db/matcher.h"
-#include "db/pipeline/document.h"
-#include "db/pipeline/expression.h"
+
+#include "mongo/db/clientcursor.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher.h"
+#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "db/pipeline/value.h"
-#include "util/string_writer.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/value.h"
#include "mongo/db/projection.h"
+#include "mongo/db/sorter/sorter.h"
#include "mongo/s/shard.h"
+#include "mongo/util/intrusive_counter.h"
+#include "mongo/util/string_writer.h"
namespace mongo {
class Accumulator;
@@ -938,48 +940,36 @@ namespace mongo {
void populate();
bool populated;
- // These are called by populate()
- void populateAll(); // no limit
- void populateOne(); // limit == 1
- void populateTopK(); // limit > 1
-
/* these two parallel each other */
typedef vector<intrusive_ptr<ExpressionFieldPath> > SortPaths;
SortPaths vSortKey;
vector<char> vAscending; // used like vector<bool> but without specialization
- struct KeyAndDoc {
- explicit KeyAndDoc(const Document& d, const SortPaths& sp); // extracts sort key
- Value key; // array of keys if vSortKey.size() > 1
- Document doc;
- };
- friend void swap(KeyAndDoc& l, KeyAndDoc& r);
+ /// Extracts the fields in vSortKey from the Document;
+ Value extractKey(const Document& d) const;
- /// Compare two KeyAndDocs according to the specified sort key.
- int compare(const KeyAndDoc& lhs, const KeyAndDoc& rhs) const;
+ /// Compare two Values according to the specified sort key.
+ int compare(const Value& lhs, const Value& rhs) const;
- /*
- This is a utility class just for the STL sort that is done
- inside.
- */
+ typedef Sorter<Value, Document> MySorter;
+
+ // For MySorter
class Comparator {
public:
explicit Comparator(const DocumentSourceSort& source): _source(source) {}
- bool operator()(const KeyAndDoc& lhs, const KeyAndDoc& rhs) const {
- return (_source.compare(lhs, rhs) < 0);
+ int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
+ return _source.compare(lhs.first, rhs.first);
}
private:
const DocumentSourceSort& _source;
};
- deque<KeyAndDoc> documents;
-
intrusive_ptr<DocumentSourceLimit> limitSrc;
+
+ bool _done;
+ Document _current;
+ scoped_ptr<MySorter::Iterator> _output;
};
- inline void swap(DocumentSourceSort::KeyAndDoc& l, DocumentSourceSort::KeyAndDoc& r) {
- l.key.swap(r.key);
- l.doc.swap(r.doc);
- }
class DocumentSourceLimit :
public SplittableDocumentSource {
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index da06157cb0f..8822f5fd073 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -39,7 +39,7 @@ namespace mongo {
if (!populated)
populate();
- return documents.empty();
+ return _done;
}
bool DocumentSourceSort::advance() {
@@ -48,15 +48,21 @@ namespace mongo {
if (!populated)
populate();
- if (!documents.empty())
- documents.pop_front(); // this way we release memory as we go
+ verify(_output);
- return !documents.empty();
+ if (_output->more()) {
+ _current = _output->next().second;
+ }
+ else {
+ _done = true;
+ }
+
+ return !_done;
}
Document DocumentSourceSort::getCurrent() {
- verify(!documents.empty());
- return documents.front().doc;
+ verify(populated && !_done);
+ return _current;
}
void DocumentSourceSort::addToBsonArray(BSONArrayBuilder *pBuilder, bool explain) const {
@@ -89,13 +95,16 @@ namespace mongo {
}
void DocumentSourceSort::dispose() {
- documents.clear();
+ _current = Document();
+ _output.reset();
+ _done = true;
pSource->dispose();
}
DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext> &pExpCtx)
: SplittableDocumentSource(pExpCtx)
, populated(false)
+ , _done(false)
{}
long long DocumentSourceSort::getLimit() const {
@@ -195,101 +204,48 @@ namespace mongo {
/* make sure we've got a sort key */
verify(vSortKey.size());
- if (!limitSrc)
- populateAll();
- else if (limitSrc->getLimit() == 1)
- populateOne();
- else
- populateTopK();
-
- populated = true;
- }
-
- void DocumentSourceSort::populateAll() {
- /* track and warn about how much physical memory has been used */
- DocMemMonitor dmm(this);
-
- /* pull everything from the underlying source */
- for (bool hasNext = !pSource->eof(); hasNext; hasNext = pSource->advance()) {
- documents.push_back(KeyAndDoc(pSource->getCurrent(), vSortKey));
- dmm.addToTotal(documents.back().doc.getApproximateSize());
- }
-
- /* sort the list */
- Comparator comparator(*this);
- sort(documents.begin(), documents.end(), comparator);
- }
-
- void DocumentSourceSort::populateOne() {
- if (pSource->eof())
+ if (pSource->eof()) {
+ _done = true;
return;
-
- KeyAndDoc best (pSource->getCurrent(), vSortKey);
- while (pSource->advance()) {
- KeyAndDoc next (pSource->getCurrent(), vSortKey);
- if (compare(next, best) < 0) {
- // we have a new best
- swap(best, next);
- }
}
- documents.push_back(best);
- }
-
- void DocumentSourceSort::populateTopK() {
- bool hasNext = !pSource->eof();
-
- size_t limit = limitSrc->getLimit();
-
- // Pull first K documents unconditionally
- vector<KeyAndDoc> heap;
- heap.reserve(limit);
- for (; hasNext && heap.size() < limit; hasNext = pSource->advance()) {
- heap.push_back(KeyAndDoc(pSource->getCurrent(), vSortKey));
- }
+ SortOptions opts;
+ if (limitSrc)
+ opts.limit = limitSrc->getLimit();
- // We now maintain a MaxHeap of K items. This means that the least-best
- // document is at the top of the heap (heap.front()). If a new
- // document is better than the top of the heap, we pop the top and add
- // the new document to the heap.
+ // TODO make these tunable
+ opts.maxMemoryUsageBytes = 100*1024*1024;
+ opts.extSortAllowed = false;
- Comparator comp (*this);
+ scoped_ptr<MySorter> sorter (MySorter::make(opts, Comparator(*this)));
- // after this, heap.front() is least-best document
- std::make_heap(heap.begin(), heap.end(), comp);
+ do { // pSource->eof() checked above
+ const Document doc = pSource->getCurrent();
+ sorter->add(extractKey(doc), doc);
+ } while (pSource->advance());
- for (; hasNext; hasNext = pSource->advance()) {
- KeyAndDoc next (pSource->getCurrent(), vSortKey);
- if (compare(next, heap.front()) < 0) {
- // remove least-best from heap
- std::pop_heap(heap.begin(), heap.end(), comp);
+ _output.reset(sorter->done());
+ verify(_output->more()); // we put something in so we should get something out
+ _current = _output->next().second;
- // add next to heap
- swap(heap.back(), next);
- std::push_heap(heap.begin(), heap.end(), comp);
- }
- }
-
- std::sort_heap(heap.begin(), heap.end(), comp);
- documents.insert(documents.begin(), heap.begin(), heap.end());
+ populated = true;
}
- DocumentSourceSort::KeyAndDoc::KeyAndDoc(const Document& d, const SortPaths& sp) :doc(d) {
- if (sp.size() == 1) {
- key = sp[0]->evaluate(d);
- return;
+ Value DocumentSourceSort::extractKey(const Document& d) const {
+ if (vSortKey.size() == 1) {
+ return vSortKey[0]->evaluate(d);
}
const Variables vars(d);
vector<Value> keys;
- keys.reserve(sp.size());
- for (size_t i=0; i < sp.size(); i++) {
- keys.push_back(sp[i]->evaluate(vars));
+ keys.reserve(vSortKey.size());
+ for (size_t i=0; i < vSortKey.size(); i++) {
+ keys.push_back(vSortKey[i]->evaluate(vars));
}
- key = Value::consume(keys);
+ return Value::consume(keys);
}
- int DocumentSourceSort::compare(const KeyAndDoc & lhs, const KeyAndDoc & rhs) const {
+ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const {
/*
populate() already checked that there is a non-empty sort key,
@@ -301,14 +257,14 @@ namespace mongo {
const size_t n = vSortKey.size();
if (n == 1) { // simple fast case
if (vAscending[0])
- return Value::compare(lhs.key, rhs.key);
+ return Value::compare(lhs, rhs);
else
- return -Value::compare(lhs.key, rhs.key);
+ return -Value::compare(lhs, rhs);
}
// compound sort
for (size_t i = 0; i < n; i++) {
- int cmp = Value::compare(lhs.key[i], rhs.key[i]);
+ int cmp = Value::compare(lhs[i], rhs[i]);
if (cmp) {
/* if necessary, adjust the return value by the key ordering */
if (!vAscending[i])
@@ -325,3 +281,6 @@ namespace mongo {
return 0;
}
}
+
+#include "db/sorter/sorter.cpp"
+// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp
index 664ffb990dc..e3153796b92 100644
--- a/src/mongo/db/pipeline/value.cpp
+++ b/src/mongo/db/pipeline/value.cpp
@@ -916,4 +916,15 @@ namespace mongo {
// Not in default case to trigger better warning if a case is missing
verify(false);
}
+
+ // TODO make these functions better
+ void Value::serializeForSorter(BufBuilder& buf) const {
+ BSONObjBuilder bb(buf);
+ addToBsonObj(&bb, "");
+ bb.doneFast();
+ }
+ Value Value::deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
+ BSONObj bson = BSONObj::deserializeForSorter(buf, BSONObj::SorterDeserializeSettings());
+ return Value(bson.firstElement());
+ }
}
diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h
index 86cb7fa570c..def84abefa6 100644
--- a/src/mongo/db/pipeline/value.h
+++ b/src/mongo/db/pipeline/value.h
@@ -234,6 +234,13 @@ namespace mongo {
/// Call this after memcpying to update ref counts if needed
void memcpyed() const { _storage.memcpyed(); }
+ /// members for Sorter
+ struct SorterDeserializeSettings {}; // unused
+ void serializeForSorter(BufBuilder& buf) const;
+ static Value deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&);
+ int memUsageForSorter() const { return getApproximateSize(); }
+ Value getOwned() const { return *this; }
+
private:
/** This is a "honeypot" to prevent unexpected implicit conversions to the accepted argument
* types. bool is especially bad since without this it will accept any pointer.