summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2019-11-04 17:53:43 +0000
committerevergreen <evergreen@mongodb.com>2019-11-04 17:53:43 +0000
commitb1ecef421c634b17d88eedc68864cdc886507dbe (patch)
treea04dd3f972c6ea36faace63f8e91be8e5771cf5b
parent80e0497ebaaaf87707239f661cac2ab99ca9a88c (diff)
downloadmongo-b1ecef421c634b17d88eedc68864cdc886507dbe.tar.gz
SERVER-44324 Template the SortExecutor.
This improves the performance of blocking sort operations executed in the agg layer of the execution tree.
-rw-r--r--src/mongo/db/exec/sort.cpp2
-rw-r--r--src/mongo/db/exec/sort.h2
-rw-r--r--src/mongo/db/exec/sort_executor.cpp109
-rw-r--r--src/mongo/db/exec/sort_executor.h116
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h2
6 files changed, 97 insertions, 136 deletions
diff --git a/src/mongo/db/exec/sort.cpp b/src/mongo/db/exec/sort.cpp
index a7c4c045234..fa6955bd79f 100644
--- a/src/mongo/db/exec/sort.cpp
+++ b/src/mongo/db/exec/sort.cpp
@@ -98,7 +98,7 @@ PlanStage::StageState SortStage::doWork(WorkingSetID* out) {
return code;
}
- auto nextWsm = _sortExecutor.getNextWsm();
+ auto nextWsm = _sortExecutor.getNext();
if (!nextWsm) {
return PlanStage::IS_EOF;
}
diff --git a/src/mongo/db/exec/sort.h b/src/mongo/db/exec/sort.h
index b727771c8bb..3f8f04a43b5 100644
--- a/src/mongo/db/exec/sort.h
+++ b/src/mongo/db/exec/sort.h
@@ -80,7 +80,7 @@ private:
// Not owned by us.
WorkingSet* _ws;
- SortExecutor _sortExecutor;
+ SortExecutor<WorkingSetMember> _sortExecutor;
// Whether or not we have finished loading data into '_sortExecutor'.
bool _populated = false;
diff --git a/src/mongo/db/exec/sort_executor.cpp b/src/mongo/db/exec/sort_executor.cpp
index bf2d02a465d..3cf2521c13d 100644
--- a/src/mongo/db/exec/sort_executor.cpp
+++ b/src/mongo/db/exec/sort_executor.cpp
@@ -48,107 +48,14 @@ std::string nextFileName() {
return "extsort-sort-executor." + std::to_string(sortExecutorFileCounter.fetchAndAdd(1));
}
} // namespace
-
-SortExecutor::SortExecutor(SortPattern sortPattern,
- uint64_t limit,
- uint64_t maxMemoryUsageBytes,
- std::string tempDir,
- bool allowDiskUse)
- : _sortPattern(std::move(sortPattern)),
- _tempDir(std::move(tempDir)),
- _diskUseAllowed(allowDiskUse) {
- _stats.sortPattern =
- _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain).toBson();
- _stats.limit = limit;
- _stats.maxMemoryUsageBytes = maxMemoryUsageBytes;
-}
-
-boost::optional<Document> SortExecutor::getNextDoc() {
- auto wsm = getNextWsm();
- if (!wsm) {
- return boost::none;
- }
-
- // Ensure that this WorkingSetMember only houses a Document. This guarantees that we are not
- // discarding data inside the working set member (e.g. the RecordId) when returning the Document
- // to the caller.
- invariant(wsm->hasOwnedObj());
-
- // Transfer metadata from the WorkingSetMember to the Document.
- MutableDocument mutableDoc{std::move(wsm->doc.value())};
- mutableDoc.setMetadata(wsm->releaseMetadata());
-
- return mutableDoc.freeze();
-}
-
-boost::optional<WorkingSetMember> SortExecutor::getNextWsm() {
- if (_isEOF) {
- return boost::none;
- }
-
- if (!_output->more()) {
- _output.reset();
- _isEOF = true;
- return boost::none;
- }
-
- return _output->next().second;
-}
-
-void SortExecutor::add(Value sortKey, Document data) {
- invariant(data.isOwned());
- WorkingSetMember wsm;
-
- // Transfer metadata from the Document to the WorkingSetMember.
- MutableDocument mutableDoc{std::move(data)};
- wsm.setMetadata(mutableDoc.releaseMetadata());
-
- wsm.doc.setValue(mutableDoc.freeze());
- wsm.transitionToOwnedObj();
- this->add(std::move(sortKey), std::move(wsm));
-}
-
-void SortExecutor::add(Value sortKey, WorkingSetMember data) {
- // Metadata should be attached directly to the WSM rather than inside the Document.
- invariant(!data.doc.value().metadata());
-
- if (!_sorter) {
- _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
- }
- _sorter->add(std::move(sortKey), std::move(data));
-
- _stats.totalDataSizeBytes += data.getMemUsage();
-}
-
-void SortExecutor::loadingDone() {
- // This conditional should only pass if no documents were added to the sorter.
- if (!_sorter) {
- _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
- }
- _output.reset(_sorter->done());
- _stats.wasDiskUsed = _stats.wasDiskUsed || _sorter->usedDisk();
- _sorter.reset();
-}
-
-SortOptions SortExecutor::makeSortOptions() const {
- SortOptions opts;
- if (_stats.limit) {
- opts.limit = _stats.limit;
- }
-
- opts.maxMemoryUsageBytes = _stats.maxMemoryUsageBytes;
- if (_diskUseAllowed) {
- opts.extSortAllowed = true;
- opts.tempDir = _tempDir;
- }
-
- return opts;
-}
-
-std::unique_ptr<SortStats> SortExecutor::cloneStats() const {
- return std::unique_ptr<SortStats>{static_cast<SortStats*>(_stats.clone())};
-}
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
-// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
+
+// Instantiate Sorter templates for sorting both Document and WorkingSetMember.
+MONGO_CREATE_SORTER(mongo::Value,
+ mongo::Document,
+ mongo::SortExecutor<mongo::Document>::Comparator);
+MONGO_CREATE_SORTER(mongo::Value,
+ mongo::WorkingSetMember,
+ mongo::SortExecutor<mongo::WorkingSetMember>::Comparator);
diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h
index 085311e19da..6f0bac4b5c9 100644
--- a/src/mongo/db/exec/sort_executor.h
+++ b/src/mongo/db/exec/sort_executor.h
@@ -43,9 +43,27 @@ namespace mongo {
* caller should provide input documents by repeated calls to the add() function, and then
* complete the loading process with a single call to loadingDone(). Finally, getNext() should be
* called to return the documents one by one in sorted order.
+ *
+ * The template parameter is the type of data being sorted. In DocumentSource execution, we sort
+ * Document objects directly, but in the PlanStage layer we may sort WorkingSetMembers. The type of
+ * the sort key, on the other hand, is always Value.
*/
+template <typename T>
class SortExecutor {
public:
+ using DocumentSorter = Sorter<Value, T>;
+ class Comparator {
+ public:
+ Comparator(const SortPattern& sortPattern) : _sortKeyComparator(sortPattern) {}
+ int operator()(const typename DocumentSorter::Data& lhs,
+ const typename DocumentSorter::Data& rhs) const {
+ return _sortKeyComparator(lhs.first, rhs.first);
+ }
+
+ private:
+ SortKeyComparator _sortKeyComparator;
+ };
+
/**
* If the passed in limit is 0, this is treated as no limit.
*/
@@ -53,11 +71,15 @@ public:
uint64_t limit,
uint64_t maxMemoryUsageBytes,
std::string tempDir,
- bool allowDiskUse);
-
- boost::optional<Document> getNextDoc();
-
- boost::optional<WorkingSetMember> getNextWsm();
+ bool allowDiskUse)
+ : _sortPattern(std::move(sortPattern)),
+ _tempDir(std::move(tempDir)),
+ _diskUseAllowed(allowDiskUse) {
+ _stats.sortPattern =
+ _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain).toBson();
+ _stats.limit = limit;
+ _stats.maxMemoryUsageBytes = maxMemoryUsageBytes;
+ }
const SortPattern& sortPattern() const {
return _sortPattern;
@@ -85,55 +107,87 @@ public:
}
/**
- * Signals to the sort executor that there will be no more input documents.
+ * Returns true if the loading phase has been explicitly completed, and then the stream of
+ * documents has subsequently been exhausted by "get next" calls.
*/
- void loadingDone();
+ bool isEOF() const {
+ return _isEOF;
+ }
+
+ const SortStats& stats() const {
+ return _stats;
+ }
+
+ std::unique_ptr<SortStats> cloneStats() const {
+ return std::unique_ptr<SortStats>{static_cast<SortStats*>(_stats.clone())};
+ }
/**
- * Add a Document with sort key specified by Value to the DocumentSorter.
+ * Add data item to be sorted of type T with sort key specified by Value to the sort executor.
+ * Should only be called before 'loadingDone()' is called.
*/
- void add(Value, Document);
+ void add(Value sortKey, T data) {
+ if (!_sorter) {
+ _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
+ }
+ _sorter->add(std::move(sortKey), std::move(data));
+
+ _stats.totalDataSizeBytes += data.memUsageForSorter();
+ }
/**
- * Add a WorkingSetMember with sort key specified by Value to the DocumentSorter.
+ * Signals to the sort executor that there will be no more input documents.
*/
- void add(Value, WorkingSetMember);
+ void loadingDone() {
+ // This conditional should only pass if no documents were added to the sorter.
+ if (!_sorter) {
+ _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
+ }
+ _output.reset(_sorter->done());
+ _stats.wasDiskUsed = _stats.wasDiskUsed || _sorter->usedDisk();
+ _sorter.reset();
+ }
/**
- * Returns true if the loading phase has been explicitly completed, and then the stream of
- * documents has subsequently been exhausted by "get next" calls.
+ * Returns the next data item in the sorted stream, or boost::none for end-of-stream. Should
+ * only be called after 'loadingDone()' is called.
*/
- bool isEOF() const {
- return _isEOF;
- }
+ boost::optional<T> getNext() {
+ if (_isEOF) {
+ return boost::none;
+ }
- const SortStats& stats() const {
- return _stats;
- }
+ if (!_output->more()) {
+ _output.reset();
+ _isEOF = true;
+ return boost::none;
+ }
- std::unique_ptr<SortStats> cloneStats() const;
+ return _output->next().second;
+ }
private:
- using DocumentSorter = Sorter<Value, WorkingSetMember>;
- class Comparator {
- public:
- Comparator(const SortPattern& sortPattern) : _sortKeyComparator(sortPattern) {}
- int operator()(const DocumentSorter::Data& lhs, const DocumentSorter::Data& rhs) const {
- return _sortKeyComparator(lhs.first, rhs.first);
+ SortOptions makeSortOptions() const {
+ SortOptions opts;
+ if (_stats.limit) {
+ opts.limit = _stats.limit;
}
- private:
- SortKeyComparator _sortKeyComparator;
- };
+ opts.maxMemoryUsageBytes = _stats.maxMemoryUsageBytes;
+ if (_diskUseAllowed) {
+ opts.extSortAllowed = true;
+ opts.tempDir = _tempDir;
+ }
- SortOptions makeSortOptions() const;
+ return opts;
+ }
const SortPattern _sortPattern;
const std::string _tempDir;
const bool _diskUseAllowed;
std::unique_ptr<DocumentSorter> _sorter;
- std::unique_ptr<DocumentSorter::Iterator> _output;
+ std::unique_ptr<typename DocumentSorter::Iterator> _output;
SortStats _stats;
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 538a5751649..a64b9741f37 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -86,7 +86,7 @@ DocumentSource::GetNextResult DocumentSourceSort::doGetNext() {
invariant(populationResult.isEOF());
}
- auto result = _sortExecutor->getNextDoc();
+ auto result = _sortExecutor->getNext();
if (!result)
return GetNextResult::makeEOF();
return GetNextResult(std::move(*result));
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 9defe4da947..69ff2b8abb1 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -183,7 +183,7 @@ private:
bool _populated = false;
- boost::optional<SortExecutor> _sortExecutor;
+ boost::optional<SortExecutor<Document>> _sortExecutor;
boost::optional<SortKeyGenerator> _sortKeyGen;
};