diff options
author | David Storch <david.storch@mongodb.com> | 2019-11-04 17:53:43 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-04 17:53:43 +0000 |
commit | b1ecef421c634b17d88eedc68864cdc886507dbe (patch) | |
tree | a04dd3f972c6ea36faace63f8e91be8e5771cf5b | |
parent | 80e0497ebaaaf87707239f661cac2ab99ca9a88c (diff) | |
download | mongo-b1ecef421c634b17d88eedc68864cdc886507dbe.tar.gz |
SERVER-44324 Template the SortExecutor.
This improves the performance of blocking sort operations
executed in the agg layer of the execution tree.
-rw-r--r-- | src/mongo/db/exec/sort.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/sort.h | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/sort_executor.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/exec/sort_executor.h | 116 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 2 |
6 files changed, 97 insertions, 136 deletions
diff --git a/src/mongo/db/exec/sort.cpp b/src/mongo/db/exec/sort.cpp index a7c4c045234..fa6955bd79f 100644 --- a/src/mongo/db/exec/sort.cpp +++ b/src/mongo/db/exec/sort.cpp @@ -98,7 +98,7 @@ PlanStage::StageState SortStage::doWork(WorkingSetID* out) { return code; } - auto nextWsm = _sortExecutor.getNextWsm(); + auto nextWsm = _sortExecutor.getNext(); if (!nextWsm) { return PlanStage::IS_EOF; } diff --git a/src/mongo/db/exec/sort.h b/src/mongo/db/exec/sort.h index b727771c8bb..3f8f04a43b5 100644 --- a/src/mongo/db/exec/sort.h +++ b/src/mongo/db/exec/sort.h @@ -80,7 +80,7 @@ private: // Not owned by us. WorkingSet* _ws; - SortExecutor _sortExecutor; + SortExecutor<WorkingSetMember> _sortExecutor; // Whether or not we have finished loading data into '_sortExecutor'. bool _populated = false; diff --git a/src/mongo/db/exec/sort_executor.cpp b/src/mongo/db/exec/sort_executor.cpp index bf2d02a465d..3cf2521c13d 100644 --- a/src/mongo/db/exec/sort_executor.cpp +++ b/src/mongo/db/exec/sort_executor.cpp @@ -48,107 +48,14 @@ std::string nextFileName() { return "extsort-sort-executor." + std::to_string(sortExecutorFileCounter.fetchAndAdd(1)); } } // namespace - -SortExecutor::SortExecutor(SortPattern sortPattern, - uint64_t limit, - uint64_t maxMemoryUsageBytes, - std::string tempDir, - bool allowDiskUse) - : _sortPattern(std::move(sortPattern)), - _tempDir(std::move(tempDir)), - _diskUseAllowed(allowDiskUse) { - _stats.sortPattern = - _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain).toBson(); - _stats.limit = limit; - _stats.maxMemoryUsageBytes = maxMemoryUsageBytes; -} - -boost::optional<Document> SortExecutor::getNextDoc() { - auto wsm = getNextWsm(); - if (!wsm) { - return boost::none; - } - - // Ensure that this WorkingSetMember only houses a Document. This guarantees that we are not - // discarding data inside the working set member (e.g. the RecordId) when returning the Document - // to the caller. - invariant(wsm->hasOwnedObj()); - - // Transfer metadata from the WorkingSetMember to the Document. - MutableDocument mutableDoc{std::move(wsm->doc.value())}; - mutableDoc.setMetadata(wsm->releaseMetadata()); - - return mutableDoc.freeze(); -} - -boost::optional<WorkingSetMember> SortExecutor::getNextWsm() { - if (_isEOF) { - return boost::none; - } - - if (!_output->more()) { - _output.reset(); - _isEOF = true; - return boost::none; - } - - return _output->next().second; -} - -void SortExecutor::add(Value sortKey, Document data) { - invariant(data.isOwned()); - WorkingSetMember wsm; - - // Transfer metadata from the Document to the WorkingSetMember. - MutableDocument mutableDoc{std::move(data)}; - wsm.setMetadata(mutableDoc.releaseMetadata()); - - wsm.doc.setValue(mutableDoc.freeze()); - wsm.transitionToOwnedObj(); - this->add(std::move(sortKey), std::move(wsm)); -} - -void SortExecutor::add(Value sortKey, WorkingSetMember data) { - // Metadata should be attached directly to the WSM rather than inside the Document. - invariant(!data.doc.value().metadata()); - - if (!_sorter) { - _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); - } - _sorter->add(std::move(sortKey), std::move(data)); - - _stats.totalDataSizeBytes += data.getMemUsage(); -} - -void SortExecutor::loadingDone() { - // This conditional should only pass if no documents were added to the sorter. - if (!_sorter) { - _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); - } - _output.reset(_sorter->done()); - _stats.wasDiskUsed = _stats.wasDiskUsed || _sorter->usedDisk(); - _sorter.reset(); -} - -SortOptions SortExecutor::makeSortOptions() const { - SortOptions opts; - if (_stats.limit) { - opts.limit = _stats.limit; - } - - opts.maxMemoryUsageBytes = _stats.maxMemoryUsageBytes; - if (_diskUseAllowed) { - opts.extSortAllowed = true; - opts.tempDir = _tempDir; - } - - return opts; -} - -std::unique_ptr<SortStats> SortExecutor::cloneStats() const { - return std::unique_ptr<SortStats>{static_cast<SortStats*>(_stats.clone())}; -} } // namespace mongo #include "mongo/db/sorter/sorter.cpp" -// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. + +// Instantiate Sorter templates for sorting both Document and WorkingSetMember. +MONGO_CREATE_SORTER(mongo::Value, + mongo::Document, + mongo::SortExecutor<mongo::Document>::Comparator); +MONGO_CREATE_SORTER(mongo::Value, + mongo::WorkingSetMember, + mongo::SortExecutor<mongo::WorkingSetMember>::Comparator); diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h index 085311e19da..6f0bac4b5c9 100644 --- a/src/mongo/db/exec/sort_executor.h +++ b/src/mongo/db/exec/sort_executor.h @@ -43,9 +43,27 @@ namespace mongo { * caller should provide input documents by repeated calls to the add() function, and then * complete the loading process with a single call to loadingDone(). Finally, getNext() should be * called to return the documents one by one in sorted order. + * + * The template parameter is the type of data being sorted. In DocumentSource execution, we sort + * Document objects directly, but in the PlanStage layer we may sort WorkingSetMembers. The type of + * the sort key, on the other hand, is always Value. */ +template <typename T> class SortExecutor { public: + using DocumentSorter = Sorter<Value, T>; + class Comparator { + public: + Comparator(const SortPattern& sortPattern) : _sortKeyComparator(sortPattern) {} + int operator()(const typename DocumentSorter::Data& lhs, + const typename DocumentSorter::Data& rhs) const { + return _sortKeyComparator(lhs.first, rhs.first); + } + + private: + SortKeyComparator _sortKeyComparator; + }; + /** * If the passed in limit is 0, this is treated as no limit. */ @@ -53,11 +71,15 @@ public: uint64_t limit, uint64_t maxMemoryUsageBytes, std::string tempDir, - bool allowDiskUse); - - boost::optional<Document> getNextDoc(); - - boost::optional<WorkingSetMember> getNextWsm(); + bool allowDiskUse) + : _sortPattern(std::move(sortPattern)), + _tempDir(std::move(tempDir)), + _diskUseAllowed(allowDiskUse) { + _stats.sortPattern = + _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain).toBson(); + _stats.limit = limit; + _stats.maxMemoryUsageBytes = maxMemoryUsageBytes; + } const SortPattern& sortPattern() const { return _sortPattern; @@ -85,55 +107,87 @@ public: } /** - * Signals to the sort executor that there will be no more input documents. + * Returns true if the loading phase has been explicitly completed, and then the stream of + * documents has subsequently been exhausted by "get next" calls. */ - void loadingDone(); + bool isEOF() const { + return _isEOF; + } + + const SortStats& stats() const { + return _stats; + } + + std::unique_ptr<SortStats> cloneStats() const { + return std::unique_ptr<SortStats>{static_cast<SortStats*>(_stats.clone())}; + } /** - * Add a Document with sort key specified by Value to the DocumentSorter. + * Add data item to be sorted of type T with sort key specified by Value to the sort executor. + * Should only be called before 'loadingDone()' is called. */ - void add(Value, Document); + void add(Value sortKey, T data) { + if (!_sorter) { + _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); + } + _sorter->add(std::move(sortKey), std::move(data)); + + _stats.totalDataSizeBytes += data.memUsageForSorter(); + } /** - * Add a WorkingSetMember with sort key specified by Value to the DocumentSorter. + * Signals to the sort executor that there will be no more input documents. */ - void add(Value, WorkingSetMember); + void loadingDone() { + // This conditional should only pass if no documents were added to the sorter. + if (!_sorter) { + _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); + } + _output.reset(_sorter->done()); + _stats.wasDiskUsed = _stats.wasDiskUsed || _sorter->usedDisk(); + _sorter.reset(); + } /** - * Returns true if the loading phase has been explicitly completed, and then the stream of - * documents has subsequently been exhausted by "get next" calls. + * Returns the next data item in the sorted stream, or boost::none for end-of-stream. Should + * only be called after 'loadingDone()' is called. */ - bool isEOF() const { - return _isEOF; - } + boost::optional<T> getNext() { + if (_isEOF) { + return boost::none; + } - const SortStats& stats() const { - return _stats; - } + if (!_output->more()) { + _output.reset(); + _isEOF = true; + return boost::none; + } - std::unique_ptr<SortStats> cloneStats() const; + return _output->next().second; + } private: - using DocumentSorter = Sorter<Value, WorkingSetMember>; - class Comparator { - public: - Comparator(const SortPattern& sortPattern) : _sortKeyComparator(sortPattern) {} - int operator()(const DocumentSorter::Data& lhs, const DocumentSorter::Data& rhs) const { - return _sortKeyComparator(lhs.first, rhs.first); + SortOptions makeSortOptions() const { + SortOptions opts; + if (_stats.limit) { + opts.limit = _stats.limit; } - private: - SortKeyComparator _sortKeyComparator; - }; + opts.maxMemoryUsageBytes = _stats.maxMemoryUsageBytes; + if (_diskUseAllowed) { + opts.extSortAllowed = true; + opts.tempDir = _tempDir; + } - SortOptions makeSortOptions() const; + return opts; + } const SortPattern _sortPattern; const std::string _tempDir; const bool _diskUseAllowed; std::unique_ptr<DocumentSorter> _sorter; - std::unique_ptr<DocumentSorter::Iterator> _output; + std::unique_ptr<typename DocumentSorter::Iterator> _output; SortStats _stats; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 538a5751649..a64b9741f37 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -86,7 +86,7 @@ DocumentSource::GetNextResult DocumentSourceSort::doGetNext() { invariant(populationResult.isEOF()); } - auto result = _sortExecutor->getNextDoc(); + auto result = _sortExecutor->getNext(); if (!result) return GetNextResult::makeEOF(); return GetNextResult(std::move(*result)); diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 9defe4da947..69ff2b8abb1 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -183,7 +183,7 @@ private: bool _populated = false; - boost::optional<SortExecutor> _sortExecutor; + boost::optional<SortExecutor<Document>> _sortExecutor; boost::optional<SortKeyGenerator> _sortKeyGen; }; |