summaryrefslogtreecommitdiff
path: root/src/mongo/db/sorter
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2022-02-02 13:40:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-02 14:43:55 +0000
commitd6a545506f7842ec6bc369ed7aedcdb8c2d40cd6 (patch)
tree86cf0f8a1f7fb237c66df5640ea8019b1d064ba8 /src/mongo/db/sorter
parentf4aaa34d623e7385b2ac5b332ee07ece1f22c428 (diff)
downloadmongo-d6a545506f7842ec6bc369ed7aedcdb8c2d40cd6.tar.gz
SERVER-60331 Make ExternalSorter respect memory limits
Diffstat (limited to 'src/mongo/db/sorter')
-rw-r--r--src/mongo/db/sorter/sorter.cpp176
-rw-r--r--src/mongo/db/sorter/sorter.h3
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp38
3 files changed, 189 insertions, 28 deletions
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index e8fece6cff2..224f302a9cc 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -65,6 +65,16 @@
#include "mongo/util/destructor_guard.h"
#include "mongo/util/str.h"
+// As this file is included in various places we need to handle the case of having the log header
+// already included
+#ifndef MONGO_LOGV2_DEFAULT_COMPONENT
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+#endif
+
+#ifndef MONGO_UTIL_LOGV2_H_
+#include "mongo/logv2/log.h"
+#endif
+
namespace mongo {
namespace {
@@ -104,6 +114,8 @@ EncryptionHooks* getEncryptionHooksIfEnabled() {
return encryptionHooks;
}
+constexpr std::size_t kSortedFileBufferSize = 64 * 1024;
+
} // namespace
namespace sorter {
@@ -509,18 +521,129 @@ private:
};
template <typename Key, typename Value, typename Comparator>
-class NoLimitSorter : public Sorter<Key, Value> {
+class MergeableSorter : public Sorter<Key, Value> {
public:
- typedef std::pair<Key, Value> Data;
- typedef SortIteratorInterface<Key, Value> Iterator;
typedef std::pair<typename Key::SorterDeserializeSettings,
typename Value::SorterDeserializeSettings>
Settings;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+
+ MergeableSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings)
+ : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) {}
+
+ MergeableSorter(const SortOptions& opts,
+ const std::string& fileName,
+ const Comparator& comp,
+ const Settings& settings)
+ : Sorter<Key, Value>(opts, fileName), _comp(comp), _settings(settings) {}
+
+protected:
+ /**
+ * Merge the spills in order to approximately respect memory usage. This method will calculate
+ * the number of spills that can be merged simultaneously in order to respect memory limits and
+ * reduce the spills to that number if necessary by merging them iteratively.
+ */
+ void _mergeSpillsToRespectMemoryLimits() {
+ auto numTargetedSpills = std::max(this->_opts.maxMemoryUsageBytes / kSortedFileBufferSize,
+ static_cast<std::size_t>(2));
+ if (this->_iters.size() > numTargetedSpills) {
+ this->_mergeSpills(numTargetedSpills);
+ }
+ }
+
+ /**
+ * An implementation of a k-way merge sort.
+ *
+ * This method will take a target number of sorted spills to merge and will proceed to merge the
+ * set of them in batches of at most numTargetedSpills until it reaches the target.
+ *
+ * To give an example, if we have 5 spills and a target number of 2 the algorithm will do the
+ * following:
+ *
+ * {1, 2, 3, 4, 5}
+ * {12, 3, 4, 5}
+ * {12, 34, 5}
+ * {1234, 5}
+ */
+ void _mergeSpills(std::size_t numTargetedSpills) {
+ using File = typename Sorter<Key, Value>::File;
+
+ std::shared_ptr<File> file = std::move(this->_file);
+ std::vector<std::shared_ptr<Iterator>> iterators = std::move(this->_iters);
+
+ LOGV2_INFO(6033104,
+ "Number of spills exceeds maximum spills to merge at a time, proceeding to "
+ "merge them to reduce the number",
+ "currentNumSpills"_attr = iterators.size(),
+ "maxNumSpills"_attr = numTargetedSpills);
+
+ while (iterators.size() > numTargetedSpills) {
+ std::shared_ptr<File> newSpillsFile =
+ std::make_shared<File>(this->_opts.tempDir + "/" + nextFileName());
+
+ LOGV2_DEBUG(6033103,
+ 1,
+ "Created new intermediate file for merged spills",
+ "path"_attr = newSpillsFile->path().string());
+
+ std::vector<std::shared_ptr<Iterator>> mergedIterators;
+ for (std::size_t i = 0; i < iterators.size(); i += numTargetedSpills) {
+ std::vector<std::shared_ptr<Iterator>> spillsToMerge;
+ auto endIndex = std::min(i + numTargetedSpills, iterators.size());
+ std::move(iterators.begin() + i,
+ iterators.begin() + endIndex,
+ std::back_inserter(spillsToMerge));
+
+ LOGV2_DEBUG(6033102,
+ 2,
+ "Merging spills",
+ "beginIdx"_attr = i,
+ "endIdx"_attr = endIndex - 1);
+
+ auto mergeIterator =
+ std::unique_ptr<Iterator>(Iterator::merge(spillsToMerge, this->_opts, _comp));
+ mergeIterator->openSource();
+ SortedFileWriter<Key, Value> writer(this->_opts, newSpillsFile, _settings);
+ while (mergeIterator->more()) {
+ auto pair = mergeIterator->next();
+ writer.addAlreadySorted(pair.first, pair.second);
+ }
+ auto iteratorPtr = std::shared_ptr<Iterator>(writer.done());
+ mergeIterator->closeSource();
+ mergedIterators.push_back(std::move(iteratorPtr));
+ this->_numSpills++;
+ }
+
+ LOGV2_DEBUG(6033101,
+ 1,
+ "Merged spills",
+ "currentNumSpills"_attr = mergedIterators.size(),
+ "targetSpills"_attr = numTargetedSpills);
+
+ iterators = std::move(mergedIterators);
+ file = std::move(newSpillsFile);
+ }
+ this->_file = std::move(file);
+ this->_iters = std::move(iterators);
+
+ LOGV2_INFO(6033100, "Finished merging spills");
+ }
+
+ const Comparator _comp;
+ const Settings _settings;
+};
+
+template <typename Key, typename Value, typename Comparator>
+class NoLimitSorter : public MergeableSorter<Key, Value, Comparator> {
+public:
+ typedef std::pair<Key, Value> Data;
+ using Iterator = typename MergeableSorter<Key, Value, Comparator>::Iterator;
+ using Settings = typename MergeableSorter<Key, Value, Comparator>::Settings;
NoLimitSorter(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) {
+ : MergeableSorter<Key, Value, Comparator>(opts, comp, settings) {
invariant(opts.limit == 0);
}
@@ -529,7 +652,7 @@ public:
const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : Sorter<Key, Value>(opts, fileName), _comp(comp), _settings(settings) {
+ : MergeableSorter<Key, Value, Comparator>(opts, fileName, comp, settings) {
invariant(opts.extSortAllowed);
uassert(16815,
@@ -549,6 +672,7 @@ public:
this->_opts.dbName,
range.getChecksum());
});
+ this->_numSpills = this->_iters.size();
}
void add(const Key& key, const Value& val) {
@@ -589,7 +713,9 @@ public:
}
spill();
- return Iterator::merge(this->_iters, this->_opts, _comp);
+ this->_mergeSpillsToRespectMemoryLimits();
+
+ return Iterator::merge(this->_iters, this->_opts, this->_comp);
}
private:
@@ -606,7 +732,7 @@ private:
};
void sort() {
- STLComparator less(_comp);
+ STLComparator less(this->_comp);
std::stable_sort(_data.begin(), _data.end(), less);
this->_numSorted += _data.size();
}
@@ -628,7 +754,7 @@ private:
sort();
- SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings);
+ SortedFileWriter<Key, Value> writer(this->_opts, this->_file, this->_settings);
for (; !_data.empty(); _data.pop_front()) {
writer.addAlreadySorted(_data.front().first, _data.front().second);
}
@@ -637,10 +763,10 @@ private:
this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
+
+ this->_numSpills++;
}
- const Comparator _comp;
- const Settings _settings;
bool _done = false;
size_t _memUsed = 0;
std::deque<Data> _data; // Data that has not been spilled.
@@ -696,20 +822,16 @@ private:
};
template <typename Key, typename Value, typename Comparator>
-class TopKSorter : public Sorter<Key, Value> {
+class TopKSorter : public MergeableSorter<Key, Value, Comparator> {
public:
typedef std::pair<Key, Value> Data;
- typedef SortIteratorInterface<Key, Value> Iterator;
- typedef std::pair<typename Key::SorterDeserializeSettings,
- typename Value::SorterDeserializeSettings>
- Settings;
+ using Iterator = typename MergeableSorter<Key, Value, Comparator>::Iterator;
+ using Settings = typename MergeableSorter<Key, Value, Comparator>::Settings;
TopKSorter(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : Sorter<Key, Value>(opts),
- _comp(comp),
- _settings(settings),
+ : MergeableSorter<Key, Value, Comparator>(opts, comp, settings),
_memUsed(0),
_haveCutoff(false),
_worstCount(0),
@@ -731,7 +853,7 @@ public:
this->_numSorted += 1;
- STLComparator less(_comp);
+ STLComparator less(this->_comp);
Data contender(key, val);
if (_data.size() < this->_opts.limit) {
@@ -785,7 +907,9 @@ public:
}
spill();
- Iterator* iterator = Iterator::merge(this->_iters, this->_opts, _comp);
+ this->_mergeSpillsToRespectMemoryLimits();
+
+ Iterator* iterator = Iterator::merge(this->_iters, this->_opts, this->_comp);
_done = true;
return iterator;
}
@@ -804,7 +928,7 @@ private:
};
void sort() {
- STLComparator less(_comp);
+ STLComparator less(this->_comp);
if (_data.size() == this->_opts.limit) {
std::sort_heap(_data.begin(), _data.end(), less);
@@ -856,7 +980,7 @@ private:
// all space complexities measure disk space rather than memory since this class is
// O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
- STLComparator less(_comp); // less is "better" for TopK.
+ STLComparator less(this->_comp); // less is "better" for TopK.
// Pick a new _worstSeen or _lastMedian if should.
if (_worstCount == 0 || less(_worstSeen, _data.back())) {
@@ -914,7 +1038,7 @@ private:
sort();
updateCutoff();
- SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings);
+ SortedFileWriter<Key, Value> writer(this->_opts, this->_file, this->_settings);
for (size_t i = 0; i < _data.size(); i++) {
writer.addAlreadySorted(_data[i].first, _data[i].second);
}
@@ -926,10 +1050,10 @@ private:
this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
+
+ this->_numSpills++;
}
- const Comparator _comp;
- const Settings _settings;
bool _done = false;
size_t _memUsed;
@@ -1121,7 +1245,7 @@ void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value&
_checksum =
addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum);
- if (_buffer.len() > 64 * 1024)
+ if (_buffer.len() > static_cast<int>(kSortedFileBufferSize))
spill();
}
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 357643b09b8..8a0c696adde 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -337,7 +337,7 @@ public:
virtual ~Sorter() {}
size_t numSpills() const {
- return _iters.size();
+ return _numSpills;
}
size_t numSorted() const {
@@ -362,6 +362,7 @@ protected:
std::shared_ptr<File> _file;
+ std::size_t _numSpills = 0; // Keeps track of the number of spills that have happened.
std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled.
};
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index 4607fef685c..5c3c1ed74a3 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -459,6 +459,19 @@ public:
ASSERT(boost::filesystem::is_empty(tempDir.path()));
} else {
ASSERT(!boost::filesystem::is_empty(tempDir.path()));
+ auto path = boost::filesystem::path(tempDir.path());
+ auto directoryIterator = boost::filesystem::directory_iterator(path);
+ auto numFiles = std::count_if(
+ directoryIterator, boost::filesystem::directory_iterator(), [](const auto& elem) {
+ return boost::filesystem::is_regular_file(elem);
+ });
+#if defined(MONGO_CONFIG_DEBUG_BUILD)
+ // Two sorters have executed
+ ASSERT_EQ(numFiles, 2);
+#else
+ // Six sorters have executed
+ ASSERT_EQ(numFiles, 6);
+#endif
}
}
@@ -489,6 +502,10 @@ public:
return 0;
}
+ virtual size_t correctNumSpills() const {
+ return 0;
+ }
+
// It is safe to ignore / overwrite any part of options
virtual SortOptions adjustSortOptions(SortOptions opts) {
return opts;
@@ -509,11 +526,13 @@ private:
if (numRanges == 0)
return;
+ auto numSpillsOccurred = correctNumSpills();
auto state = sorter->persistDataForShutdown();
if (opts.extSortAllowed) {
ASSERT_NE(state.fileName, "");
}
ASSERT_EQ(state.ranges.size(), numRanges);
+ ASSERT_EQ(sorter->numSpills(), numSpillsOccurred);
}
};
@@ -609,9 +628,26 @@ public:
}
size_t correctNumRanges() const override {
+ return std::max(static_cast<std::size_t>(MEM_LIMIT / kSortedFileBufferSize),
+ static_cast<std::size_t>(2));
+ }
+
+ size_t correctNumSpills() const override {
// We add 1 to the calculation since the call to persistDataForShutdown() spills the
// remaining in-memory Sorter data to disk, adding one extra range.
- return NUM_ITEMS * sizeof(IWPair) / MEM_LIMIT + 1;
+ std::size_t spillsToMerge = NUM_ITEMS * sizeof(IWPair) / MEM_LIMIT + 1;
+ // As the spills may get merged we'll account for the intermediate spills that happen.
+ std::size_t spillsDone = spillsToMerge;
+ std::size_t targetRanges = correctNumRanges();
+ while (spillsToMerge > targetRanges) {
+ auto newSpills = spillsToMerge / targetRanges;
+ spillsDone += newSpills;
+ if ((spillsToMerge % targetRanges) > 0) {
+ spillsDone++;
+ }
+ spillsToMerge = newSpills;
+ }
+ return spillsDone;
}
enum Constants {