From d6a545506f7842ec6bc369ed7aedcdb8c2d40cd6 Mon Sep 17 00:00:00 2001 From: Jordi Olivares Provencio Date: Wed, 2 Feb 2022 13:40:49 +0000 Subject: SERVER-60331 Make ExternalSorter respect memory limits --- src/mongo/db/sorter/sorter.cpp | 176 ++++++++++++++++++++++++++++++------ src/mongo/db/sorter/sorter.h | 3 +- src/mongo/db/sorter/sorter_test.cpp | 38 +++++++- 3 files changed, 189 insertions(+), 28 deletions(-) (limited to 'src/mongo/db/sorter') diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index e8fece6cff2..224f302a9cc 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -65,6 +65,16 @@ #include "mongo/util/destructor_guard.h" #include "mongo/util/str.h" +// As this file is included in various places we need to handle the case of having the log header +// already included +#ifndef MONGO_LOGV2_DEFAULT_COMPONENT +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault +#endif + +#ifndef MONGO_UTIL_LOGV2_H_ +#include "mongo/logv2/log.h" +#endif + namespace mongo { namespace { @@ -104,6 +114,8 @@ EncryptionHooks* getEncryptionHooksIfEnabled() { return encryptionHooks; } +constexpr std::size_t kSortedFileBufferSize = 64 * 1024; + } // namespace namespace sorter { @@ -509,18 +521,129 @@ private: }; template -class NoLimitSorter : public Sorter { +class MergeableSorter : public Sorter { public: - typedef std::pair Data; - typedef SortIteratorInterface Iterator; typedef std::pair Settings; + typedef SortIteratorInterface Iterator; + + MergeableSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings) + : Sorter(opts), _comp(comp), _settings(settings) {} + + MergeableSorter(const SortOptions& opts, + const std::string& fileName, + const Comparator& comp, + const Settings& settings) + : Sorter(opts, fileName), _comp(comp), _settings(settings) {} + +protected: + /** + * Merge the spills in order to approximately respect memory usage. This method will calculate + * the number of spills that can be merged simultaneously in order to respect memory limits and + * reduce the spills to that number if necessary by merging them iteratively. + */ + void _mergeSpillsToRespectMemoryLimits() { + auto numTargetedSpills = std::max(this->_opts.maxMemoryUsageBytes / kSortedFileBufferSize, + static_cast(2)); + if (this->_iters.size() > numTargetedSpills) { + this->_mergeSpills(numTargetedSpills); + } + } + + /** + * An implementation of a k-way merge sort. + * + * This method will take a target number of sorted spills to merge and will proceed to merge the + * set of them in batches of at most numTargetedSpills until it reaches the target. + * + * To give an example, if we have 5 spills and a target number of 2 the algorithm will do the + * following: + * + * {1, 2, 3, 4, 5} + * {12, 3, 4, 5} + * {12, 34, 5} + * {1234, 5} + */ + void _mergeSpills(std::size_t numTargetedSpills) { + using File = typename Sorter::File; + + std::shared_ptr file = std::move(this->_file); + std::vector> iterators = std::move(this->_iters); + + LOGV2_INFO(6033104, + "Number of spills exceeds maximum spills to merge at a time, proceeding to " + "merge them to reduce the number", + "currentNumSpills"_attr = iterators.size(), + "maxNumSpills"_attr = numTargetedSpills); + + while (iterators.size() > numTargetedSpills) { + std::shared_ptr newSpillsFile = + std::make_shared(this->_opts.tempDir + "/" + nextFileName()); + + LOGV2_DEBUG(6033103, + 1, + "Created new intermediate file for merged spills", + "path"_attr = newSpillsFile->path().string()); + + std::vector> mergedIterators; + for (std::size_t i = 0; i < iterators.size(); i += numTargetedSpills) { + std::vector> spillsToMerge; + auto endIndex = std::min(i + numTargetedSpills, iterators.size()); + std::move(iterators.begin() + i, + iterators.begin() + endIndex, + std::back_inserter(spillsToMerge)); + + LOGV2_DEBUG(6033102, + 2, + "Merging spills", + "beginIdx"_attr = i, + "endIdx"_attr = endIndex - 1); + + auto mergeIterator = + std::unique_ptr(Iterator::merge(spillsToMerge, this->_opts, _comp)); + mergeIterator->openSource(); + SortedFileWriter writer(this->_opts, newSpillsFile, _settings); + while (mergeIterator->more()) { + auto pair = mergeIterator->next(); + writer.addAlreadySorted(pair.first, pair.second); + } + auto iteratorPtr = std::shared_ptr(writer.done()); + mergeIterator->closeSource(); + mergedIterators.push_back(std::move(iteratorPtr)); + this->_numSpills++; + } + + LOGV2_DEBUG(6033101, + 1, + "Merged spills", + "currentNumSpills"_attr = mergedIterators.size(), + "targetSpills"_attr = numTargetedSpills); + + iterators = std::move(mergedIterators); + file = std::move(newSpillsFile); + } + this->_file = std::move(file); + this->_iters = std::move(iterators); + + LOGV2_INFO(6033100, "Finished merging spills"); + } + + const Comparator _comp; + const Settings _settings; +}; + +template +class NoLimitSorter : public MergeableSorter { +public: + typedef std::pair Data; + using Iterator = typename MergeableSorter::Iterator; + using Settings = typename MergeableSorter::Settings; NoLimitSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : Sorter(opts), _comp(comp), _settings(settings) { + : MergeableSorter(opts, comp, settings) { invariant(opts.limit == 0); } @@ -529,7 +652,7 @@ public: const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : Sorter(opts, fileName), _comp(comp), _settings(settings) { + : MergeableSorter(opts, fileName, comp, settings) { invariant(opts.extSortAllowed); uassert(16815, @@ -549,6 +672,7 @@ public: this->_opts.dbName, range.getChecksum()); }); + this->_numSpills = this->_iters.size(); } void add(const Key& key, const Value& val) { @@ -589,7 +713,9 @@ public: } spill(); - return Iterator::merge(this->_iters, this->_opts, _comp); + this->_mergeSpillsToRespectMemoryLimits(); + + return Iterator::merge(this->_iters, this->_opts, this->_comp); } private: @@ -606,7 +732,7 @@ private: }; void sort() { - STLComparator less(_comp); + STLComparator less(this->_comp); std::stable_sort(_data.begin(), _data.end(), less); this->_numSorted += _data.size(); } @@ -628,7 +754,7 @@ private: sort(); - SortedFileWriter writer(this->_opts, this->_file, _settings); + SortedFileWriter writer(this->_opts, this->_file, this->_settings); for (; !_data.empty(); _data.pop_front()) { writer.addAlreadySorted(_data.front().first, _data.front().second); } @@ -637,10 +763,10 @@ private: this->_iters.push_back(std::shared_ptr(iteratorPtr)); _memUsed = 0; + + this->_numSpills++; } - const Comparator _comp; - const Settings _settings; bool _done = false; size_t _memUsed = 0; std::deque _data; // Data that has not been spilled. @@ -696,20 +822,16 @@ private: }; template -class TopKSorter : public Sorter { +class TopKSorter : public MergeableSorter { public: typedef std::pair Data; - typedef SortIteratorInterface Iterator; - typedef std::pair - Settings; + using Iterator = typename MergeableSorter::Iterator; + using Settings = typename MergeableSorter::Settings; TopKSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : Sorter(opts), - _comp(comp), - _settings(settings), + : MergeableSorter(opts, comp, settings), _memUsed(0), _haveCutoff(false), _worstCount(0), @@ -731,7 +853,7 @@ public: this->_numSorted += 1; - STLComparator less(_comp); + STLComparator less(this->_comp); Data contender(key, val); if (_data.size() < this->_opts.limit) { @@ -785,7 +907,9 @@ public: } spill(); - Iterator* iterator = Iterator::merge(this->_iters, this->_opts, _comp); + this->_mergeSpillsToRespectMemoryLimits(); + + Iterator* iterator = Iterator::merge(this->_iters, this->_opts, this->_comp); _done = true; return iterator; } @@ -804,7 +928,7 @@ private: }; void sort() { - STLComparator less(_comp); + STLComparator less(this->_comp); if (_data.size() == this->_opts.limit) { std::sort_heap(_data.begin(), _data.end(), less); @@ -856,7 +980,7 @@ private: // all space complexities measure disk space rather than memory since this class is // O(1) in memory due to the _opts.maxMemoryUsageBytes limit. - STLComparator less(_comp); // less is "better" for TopK. + STLComparator less(this->_comp); // less is "better" for TopK. // Pick a new _worstSeen or _lastMedian if should. if (_worstCount == 0 || less(_worstSeen, _data.back())) { @@ -914,7 +1038,7 @@ private: sort(); updateCutoff(); - SortedFileWriter writer(this->_opts, this->_file, _settings); + SortedFileWriter writer(this->_opts, this->_file, this->_settings); for (size_t i = 0; i < _data.size(); i++) { writer.addAlreadySorted(_data[i].first, _data[i].second); } @@ -926,10 +1050,10 @@ private: this->_iters.push_back(std::shared_ptr(iteratorPtr)); _memUsed = 0; + + this->_numSpills++; } - const Comparator _comp; - const Settings _settings; bool _done = false; size_t _memUsed; @@ -1121,7 +1245,7 @@ void SortedFileWriter::addAlreadySorted(const Key& key, const Value& _checksum = addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum); - if (_buffer.len() > 64 * 1024) + if (_buffer.len() > static_cast(kSortedFileBufferSize)) spill(); } diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 357643b09b8..8a0c696adde 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -337,7 +337,7 @@ public: virtual ~Sorter() {} size_t numSpills() const { - return _iters.size(); + return _numSpills; } size_t numSorted() const { @@ -362,6 +362,7 @@ protected: std::shared_ptr _file; + std::size_t _numSpills = 0; // Keeps track of the number of spills that have happened. std::vector> _iters; // Data that has already been spilled. }; diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index 4607fef685c..5c3c1ed74a3 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -459,6 +459,19 @@ public: ASSERT(boost::filesystem::is_empty(tempDir.path())); } else { ASSERT(!boost::filesystem::is_empty(tempDir.path())); + auto path = boost::filesystem::path(tempDir.path()); + auto directoryIterator = boost::filesystem::directory_iterator(path); + auto numFiles = std::count_if( + directoryIterator, boost::filesystem::directory_iterator(), [](const auto& elem) { + return boost::filesystem::is_regular_file(elem); + }); +#if defined(MONGO_CONFIG_DEBUG_BUILD) + // Two sorters have executed + ASSERT_EQ(numFiles, 2); +#else + // Six sorters have executed + ASSERT_EQ(numFiles, 6); +#endif } } @@ -489,6 +502,10 @@ public: return 0; } + virtual size_t correctNumSpills() const { + return 0; + } + // It is safe to ignore / overwrite any part of options virtual SortOptions adjustSortOptions(SortOptions opts) { return opts; @@ -509,11 +526,13 @@ private: if (numRanges == 0) return; + auto numSpillsOccurred = correctNumSpills(); auto state = sorter->persistDataForShutdown(); if (opts.extSortAllowed) { ASSERT_NE(state.fileName, ""); } ASSERT_EQ(state.ranges.size(), numRanges); + ASSERT_EQ(sorter->numSpills(), numSpillsOccurred); } }; @@ -609,9 +628,26 @@ public: } size_t correctNumRanges() const override { + return std::max(static_cast(MEM_LIMIT / kSortedFileBufferSize), + static_cast(2)); + } + + size_t correctNumSpills() const override { // We add 1 to the calculation since the call to persistDataForShutdown() spills the // remaining in-memory Sorter data to disk, adding one extra range. - return NUM_ITEMS * sizeof(IWPair) / MEM_LIMIT + 1; + std::size_t spillsToMerge = NUM_ITEMS * sizeof(IWPair) / MEM_LIMIT + 1; + // As the spills may get merged we'll account for the intermediate spills that happen. + std::size_t spillsDone = spillsToMerge; + std::size_t targetRanges = correctNumRanges(); + while (spillsToMerge > targetRanges) { + auto newSpills = spillsToMerge / targetRanges; + spillsDone += newSpills; + if ((spillsToMerge % targetRanges) > 0) { + spillsDone++; + } + spillsToMerge = newSpills; + } + return spillsDone; } enum Constants { -- cgit v1.2.1