diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2021-09-23 14:40:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-23 14:56:17 +0000 |
commit | a9efdaf12d5f8a2c9ba9610d21fafdd0eeb8551e (patch) | |
tree | 8f2d986b64a571d317ee093bbc6602ddb9bc4f36 | |
parent | 0289da9fb06c5af672bf4220176b735dcc910815 (diff) | |
download | mongo-a9efdaf12d5f8a2c9ba9610d21fafdd0eeb8551e.tar.gz |
SERVER-54791 Use single file descriptor for external sort
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 6 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.cpp | 404 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.h | 100 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter_test.cpp | 23 |
5 files changed, 298 insertions, 264 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index c7bf44e72e0..a82d90b9be3 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -29,7 +29,6 @@ #include "mongo/platform/basic.h" -#include <boost/filesystem/operations.hpp> #include <memory> #include "mongo/db/exec/document_value/document.h" @@ -377,20 +376,14 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _memoryTracker{pExpCtx->allowDiskUse && !pExpCtx->inMongos, maxMemoryUsageBytes ? *maxMemoryUsageBytes : internalDocumentSourceGroupMaxMemoryBytes.load()}, + // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system. + _file(!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild) + ? std::make_shared<Sorter<Value, Value>::File>(pExpCtx->tempDir + "/" + + nextFileName()) + : nullptr), _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), - _spilled(false) { - if (!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)) { - // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system. - _fileName = pExpCtx->tempDir + "/" + nextFileName(); - } -} - -DocumentSourceGroup::~DocumentSourceGroup() { - if (_ownsFileDeletion) { - DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); - } -} + _spilled(false) {} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { _accumulatedFields.push_back(accumulationStatement); @@ -595,11 +588,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( - _sortedFiles, - _fileName, - SortOptions(), - SorterComparator(pExpCtx->getValueComparator()))); - _ownsFileDeletion = false; + _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); // prepare current to accumulate data _currentAccumulators.reserve(numAccumulators); @@ -637,8 +626,7 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator())); - SortedFileWriter<Value, Value> writer( - SortOptions().TempDir(pExpCtx->tempDir), _fileName, _nextSortedFileWriterOffset); + SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file); switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i. case 0: // no values, essentially a distinct for (size_t i = 0; i < ptrs.size(); i++) { @@ -667,7 +655,6 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { _groups->clear(); Sorter<Value, Value>::Iterator* iteratorPtr = writer.done(); - _nextSortedFileWriterOffset = writer.getFileEndOffset(); return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr); } diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index ec202833ead..ee8ad1f2f14 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -200,8 +200,6 @@ private: explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, boost::optional<size_t> maxMemoryUsageBytes = boost::none); - ~DocumentSourceGroup(); - /** * getNext() dispatches to one of these three depending on what type of $group it is. These * methods expect '_currentAccumulators' to have been reset before being called, and also expect @@ -260,9 +258,7 @@ private: MemoryUsageTracker _memoryTracker; - std::string _fileName; - std::streampos _nextSortedFileWriterOffset = 0; - bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over. + std::shared_ptr<Sorter<Value, Value>::File> _file; std::vector<std::string> _idFieldNames; // used when id is a document std::vector<boost::intrusive_ptr<Expression>> _idExpressions; diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index 661b2effab0..39639153d7b 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -84,8 +84,6 @@ uint32_t addDataToChecksum(const void* startOfData, size_t sizeOfData, uint32_t namespace sorter { -using std::shared_ptr; - // We need to use the "real" errno everywhere, not GetLastError() on Windows inline std::string myErrnoWithDescription() { int errnoCopy = errno; @@ -168,42 +166,21 @@ public: Settings; typedef std::pair<Key, Value> Data; - FileIterator(const std::string& fileName, - std::streampos fileStartOffset, - std::streampos fileEndOffset, + FileIterator(std::shared_ptr<typename Sorter<Key, Value>::File> file, + std::streamoff fileStartOffset, + std::streamoff fileEndOffset, const Settings& settings, const uint32_t checksum) : _settings(settings), - _done(false), - _fileName(fileName), + _file(std::move(file)), _fileStartOffset(fileStartOffset), + _fileCurrentOffset(fileStartOffset), _fileEndOffset(fileEndOffset), - _originalChecksum(checksum) { - uassert(16815, - str::stream() << "unexpected empty file: " << _fileName, - boost::filesystem::file_size(_fileName) != 0); - } + _originalChecksum(checksum) {} - void openSource() { - _file.open(_fileName.c_str(), std::ios::in | std::ios::binary); - uassert(16814, - str::stream() << "error opening file \"" << _fileName - << "\": " << myErrnoWithDescription(), - _file.good()); - _file.seekg(_fileStartOffset); - uassert(50979, - str::stream() << "error seeking starting offset of '" << _fileStartOffset - << "' in file \"" << _fileName << "\": " << myErrnoWithDescription(), - _file.good()); - } + void openSource() {} void closeSource() { - _file.close(); - uassert(50969, - str::stream() << "error closing file \"" << _fileName - << "\": " << myErrnoWithDescription(), - !_file.fail()); - // If the file iterator reads through all data objects, we can ensure non-corrupt data // by comparing the newly calculated checksum with the original checksum from the data // written to disk. Some iterators do not read back all data from the file, which prohibits @@ -219,13 +196,13 @@ public: bool more() { if (!_done) - fillBufferIfNeeded(); // may change _done + _fillBufferIfNeeded(); // may change _done return !_done; } Data next() { - verify(!_done); - fillBufferIfNeeded(); + invariant(!_done); + _fillBufferIfNeeded(); const char* startOfNewData = static_cast<const char*>(_bufferReader->pos()); @@ -250,20 +227,20 @@ private: /** * Attempts to refill the _bufferReader if it is empty. Expects _done to be false. */ - void fillBufferIfNeeded() { - verify(!_done); + void _fillBufferIfNeeded() { + invariant(!_done); if (!_bufferReader || _bufferReader->atEof()) - fillBufferFromDisk(); + _fillBufferFromDisk(); } /** * Tries to read from disk and places any results in _bufferReader. If there is no more data to * read, then _done is set to true and the function returns immediately. */ - void fillBufferFromDisk() { + void _fillBufferFromDisk() { int32_t rawSize; - read(&rawSize, sizeof(rawSize)); + _read(&rawSize, sizeof(rawSize)); if (_done) return; @@ -272,7 +249,7 @@ private: int32_t blockSize = std::abs(rawSize); _buffer.reset(new char[blockSize]); - read(_buffer.get(), blockSize); + _read(_buffer.get(), blockSize); uassert(16816, "file too short?", !_done); auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext()); @@ -316,41 +293,31 @@ private: /** * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset. - * - * Masserts on any file errors */ - void read(void* out, size_t size) { - invariant(_file.is_open()); - - const std::streampos offset = _file.tellg(); - uassert(51049, - str::stream() << "error reading file \"" << _fileName - << "\": " << myErrnoWithDescription(), - offset >= 0); - - if (offset >= _fileEndOffset) { - invariant(offset == _fileEndOffset); + void _read(void* out, size_t size) { + if (_fileCurrentOffset == _fileEndOffset) { _done = true; return; } - _file.read(reinterpret_cast<char*>(out), size); - uassert(16817, - str::stream() << "error reading file \"" << _fileName - << "\": " << myErrnoWithDescription(), - _file.good()); - verify(_file.gcount() == static_cast<std::streamsize>(size)); + invariant(_fileCurrentOffset < _fileEndOffset, + str::stream() << "Current file offset (" << _fileCurrentOffset + << ") greater than end offset (" << _fileEndOffset << ")"); + + _file->read(_fileCurrentOffset, size, out); + _fileCurrentOffset += size; } const Settings _settings; - bool _done; + bool _done = false; std::unique_ptr<char[]> _buffer; std::unique_ptr<BufReader> _bufferReader; - std::string _fileName; // File containing the sorted data range. - std::streampos _fileStartOffset; // File offset at which the sorted data range starts. - std::streampos _fileEndOffset; // File offset at which the sorted data range ends. - std::ifstream _file; + std::shared_ptr<typename Sorter<Key, Value>::File> + _file; // File containing the sorted data range. + std::streamoff _fileStartOffset; // File offset at which the sorted data range starts. + std::streamoff _fileCurrentOffset; // File offset at which we are currently reading from. + std::streamoff _fileEndOffset; // File offset at which the sorted data range ends. // Checksum value that is updated with each read of a data object from disk. We can compare // this value with _originalChecksum to check for data corruption if and only if the @@ -375,14 +342,12 @@ public: typedef std::pair<Key, Value> Data; MergeIterator(const std::vector<std::shared_ptr<Input>>& iters, - const std::string& itersSourceFileName, const SortOptions& opts, const Comparator& comp) : _opts(opts), _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()), _first(true), - _greater(comp), - _itersSourceFileName(itersSourceFileName) { + _greater(comp) { for (size_t i = 0; i < iters.size(); i++) { iters[i]->openSource(); if (iters[i]->more()) { @@ -404,11 +369,8 @@ public: } ~MergeIterator() { - // Clear the remaining Stream objects first, to close the file handles before deleting the - // file. Some systems will error closing the file if any file handles are still open. _current.reset(); _heap.clear(); - DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName)); } void openSource() {} @@ -509,7 +471,6 @@ private: std::shared_ptr<Stream> _current; std::vector<std::shared_ptr<Stream>> _heap; // MinHeap STLComparator _greater; // named so calls make sense - std::string _itersSourceFileName; }; template <typename Key, typename Value, typename Comparator> @@ -524,19 +485,8 @@ public: NoLimitSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) { - verify(_opts.limit == 0); - if (_opts.extSortAllowed) { - _fileName = _opts.tempDir + "/" + nextFileName(); - } - } - - ~NoLimitSorter() { - if (!_done) { - // If done() was never called to return a MergeIterator, then this Sorter still owns - // file deletion. - DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); - } + : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) { + invariant(opts.limit == 0); } void add(const Key& key, const Value& val) { @@ -547,22 +497,20 @@ public: _memUsed += key.memUsageForSorter(); _memUsed += val.memUsageForSorter(); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); } Iterator* done() { - invariant(!_done); + invariant(!std::exchange(_done, true)); - if (_iters.empty()) { + if (this->_iters.empty()) { sort(); return new InMemIterator<Key, Value>(_data); } spill(); - Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp); - _done = true; - return mergeIt; + return Iterator::merge(this->_iters, this->_opts, _comp); } private: @@ -581,53 +529,41 @@ private: void sort() { STLComparator less(_comp); std::stable_sort(_data.begin(), _data.end(), less); - - // Does 2x more compares than stable_sort - // TODO test on windows - // std::sort(_data.begin(), _data.end(), comp); } void spill() { - invariant(!_done); - - this->_usedDisk = true; if (_data.empty()) return; - if (!_opts.extSortAllowed) { + if (!this->_opts.extSortAllowed) { // This error message only applies to sorts from user queries made through the find or // aggregation commands. Other clients, such as bulk index builds, should suppress this // error, either by allowing external sorting or by catching and throwing a more // appropriate error. uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes - << " bytes, but did not opt in to external sorting."); + str::stream() + << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes + << " bytes, but did not opt in to external sorting."); } sort(); - SortedFileWriter<Key, Value> writer( - _opts, _fileName, _nextSortedFileWriterOffset, _settings); + SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings); for (; !_data.empty(); _data.pop_front()) { writer.addAlreadySorted(_data.front().first, _data.front().second); } Iterator* iteratorPtr = writer.done(); - _nextSortedFileWriterOffset = writer.getFileEndOffset(); - _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); + this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); _memUsed = 0; } const Comparator _comp; const Settings _settings; - SortOptions _opts; - std::string _fileName; - std::streampos _nextSortedFileWriterOffset = 0; bool _done = false; - size_t _memUsed; - std::deque<Data> _data; // the "current" data - std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled + size_t _memUsed = 0; + std::deque<Data> _data; // Data that has not been spilled. }; template <typename Key, typename Value, typename Comparator> @@ -666,6 +602,10 @@ public: } private: + void spill() { + invariant(false, "LimitOneSorter does not spill to disk"); + } + const Comparator _comp; Data _best; bool _haveData; // false at start, set to true on first call to add() @@ -683,19 +623,15 @@ public: TopKSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : _comp(comp), + : Sorter<Key, Value>(opts), + _comp(comp), _settings(settings), - _opts(opts), _memUsed(0), _haveCutoff(false), _worstCount(0), _medianCount(0) { // This also *works* with limit==1 but LimitOneSorter should be used instead - verify(_opts.limit > 1); - - if (_opts.extSortAllowed) { - _fileName = _opts.tempDir + "/" + nextFileName(); - } + invariant(opts.limit > 1); // Preallocate a fixed sized vector of the required size if we don't expect it to have a // major impact on our memory budget. This is the common case with small limits. @@ -706,21 +642,13 @@ public: } } - ~TopKSorter() { - if (!_done) { - // If done() was never called to return a MergeIterator, then this Sorter still owns - // file deletion. - DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); - } - } - void add(const Key& key, const Value& val) { invariant(!_done); STLComparator less(_comp); Data contender(key, val); - if (_data.size() < _opts.limit) { + if (_data.size() < this->_opts.limit) { if (_haveCutoff && !less(contender, _cutoff)) return; @@ -729,16 +657,16 @@ public: _memUsed += key.memUsageForSorter(); _memUsed += val.memUsageForSorter(); - if (_data.size() == _opts.limit) + if (_data.size() == this->_opts.limit) std::make_heap(_data.begin(), _data.end(), less); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); return; } - verify(_data.size() == _opts.limit); + invariant(_data.size() == this->_opts.limit); if (!less(contender, _data.front())) return; // not good enough @@ -755,18 +683,18 @@ public: _data.back() = {contender.first.getOwned(), contender.second.getOwned()}; std::push_heap(_data.begin(), _data.end(), less); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); } Iterator* done() { - if (_iters.empty()) { + if (this->_iters.empty()) { sort(); return new InMemIterator<Key, Value>(_data); } spill(); - Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp); + Iterator* iterator = Iterator::merge(this->_iters, this->_opts, _comp); _done = true; return iterator; } @@ -787,7 +715,7 @@ private: void sort() { STLComparator less(_comp); - if (_data.size() == _opts.limit) { + if (_data.size() == this->_opts.limit) { std::sort_heap(_data.begin(), _data.end(), less); } else { std::stable_sort(_data.begin(), _data.end(), less); @@ -856,14 +784,14 @@ private: // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should. - if (_worstCount >= _opts.limit) { + if (_worstCount >= this->_opts.limit) { if (!_haveCutoff || less(_worstSeen, _cutoff)) { _cutoff = _worstSeen; _haveCutoff = true; } _worstCount = 0; } - if (_medianCount >= _opts.limit) { + if (_medianCount >= this->_opts.limit) { if (!_haveCutoff || less(_lastMedian, _cutoff)) { _cutoff = _lastMedian; _haveCutoff = true; @@ -875,17 +803,16 @@ private: void spill() { invariant(!_done); - this->_usedDisk = true; if (_data.empty()) return; - if (!_opts.extSortAllowed) { + if (!this->_opts.extSortAllowed) { // This error message only applies to sorts from user queries made through the find or // aggregation commands. Other clients should suppress this error, either by allowing // external sorting or by catching and throwing a more appropriate error. uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, str::stream() - << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes + << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes << " bytes, but did not opt in to external sorting. Aborting operation." << " Pass allowDiskUse:true to opt in."); } @@ -896,8 +823,7 @@ private: sort(); updateCutoff(); - SortedFileWriter<Key, Value> writer( - _opts, _fileName, _nextSortedFileWriterOffset, _settings); + SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings); for (size_t i = 0; i < _data.size(); i++) { writer.addAlreadySorted(_data[i].first, _data[i].second); } @@ -906,21 +832,18 @@ private: std::vector<Data>().swap(_data); Iterator* iteratorPtr = writer.done(); - _nextSortedFileWriterOffset = writer.getFileEndOffset(); - _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); + this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); _memUsed = 0; } const Comparator _comp; const Settings _settings; - SortOptions _opts; - std::string _fileName; - std::streampos _nextSortedFileWriterOffset = 0; bool _done = false; size_t _memUsed; - std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit. - std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled + + // Data that has not been spilled. Organized as max-heap if size == limit. + std::vector<Data> _data; // See updateCutoff() for a full description of how these members are used. bool _haveCutoff; @@ -933,17 +856,139 @@ private: } // namespace sorter +template <typename Key, typename Value> +Sorter<Key, Value>::Sorter(const SortOptions& opts) + : _opts(opts), + _file(opts.extSortAllowed + ? std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + nextFileName()) + : nullptr) {} + +template <typename Key, typename Value> +Sorter<Key, Value>::Sorter(const SortOptions& opts, const std::string& fileName) + : _opts(opts), + _file(std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + fileName)) { + invariant(opts.extSortAllowed); + invariant(!opts.tempDir.empty()); + invariant(!fileName.empty()); +} + +template <typename Key, typename Value> +Sorter<Key, Value>::File::~File() { + if (_keep) { + return; + } + + if (_file.is_open()) { + DESTRUCTOR_GUARD(_file.exceptions(std::ios::failbit)); + DESTRUCTOR_GUARD(_file.close()); + } + + DESTRUCTOR_GUARD(boost::filesystem::remove(_path)); +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::read(std::streamoff offset, std::streamsize size, void* out) { + if (!_file.is_open()) { + _open(); + } + + if (_offset != -1) { + _file.exceptions(std::ios::goodbit); + _file.flush(); + _offset = -1; + + uassert(5479100, + str::stream() << "Error flushing file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file); + } + + _file.seekg(offset); + _file.read(reinterpret_cast<char*>(out), size); + + uassert(16817, + str::stream() << "Error reading file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file); + + invariant(_file.gcount() == size, + str::stream() << "Number of bytes read (" << _file.gcount() + << ") not equal to expected number (" << size << ")"); + + uassert(51049, + str::stream() << "Error reading file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file.tellg() >= 0); +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::write(const char* data, std::streamsize size) { + _ensureOpenForWriting(); + + try { + _file.write(data, size); + _offset += size; + } catch (const std::system_error& ex) { + if (ex.code() == std::errc::no_space_on_device) { + uasserted(ErrorCodes::OutOfDiskSpace, + str::stream() << ex.what() << ": " << _path.string()); + } + uasserted(5642403, + str::stream() << "Error writing to file " << _path.string() << ": " + << sorter::myErrnoWithDescription()); + } catch (const std::exception&) { + uasserted(16821, + str::stream() << "Error writing to file " << _path.string() << ": " + << sorter::myErrnoWithDescription()); + } +} + +template <typename Key, typename Value> +std::streamoff Sorter<Key, Value>::File::currentOffset() { + _ensureOpenForWriting(); + return _offset; +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::_open() { + invariant(!_file.is_open()); + + boost::filesystem::create_directories(_path.parent_path()); + + // We open the provided file in append mode so that SortedFileWriter instances can share + // the same file, used serially. We want to share files in order to stay below system + // open file limits. + _file.open(_path.string(), std::ios::app | std::ios::binary | std::ios::in | std::ios::out); + + uassert(16818, + str::stream() << "Error opening file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file.good()); +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::_ensureOpenForWriting() { + invariant(_offset != -1 || !_file.is_open()); + + if (_file.is_open()) { + return; + } + + _open(); + _file.exceptions(std::ios::failbit | std::ios::badbit); + _offset = boost::filesystem::file_size(_path); +} + // // SortedFileWriter // template <typename Key, typename Value> -SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, - const std::string& fileName, - const std::streampos fileStartOffset, - const Settings& settings) - : _settings(settings) { - +SortedFileWriter<Key, Value>::SortedFileWriter( + const SortOptions& opts, + std::shared_ptr<typename Sorter<Key, Value>::File> file, + const Settings& settings) + : _settings(settings), _file(std::move(file)), _fileStartOffset(_file->currentOffset()) { // This should be checked by consumers, but if we get here don't allow writes. uassert( 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos()); @@ -951,26 +996,6 @@ SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, uassert(17148, "Attempting to use external sort without setting SortOptions::tempDir", !opts.tempDir.empty()); - - boost::filesystem::create_directories(opts.tempDir); - - _fileName = fileName; - - // We open the provided file in append mode so that SortedFileWriter instances can share the - // same file, used serially. We want to share files in order to stay below system open file - // limits. - _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out); - uassert(16818, - str::stream() << "error opening file \"" << _fileName - << "\": " << sorter::myErrnoWithDescription(), - _file.good()); - // The file descriptor is positioned at the end of a file when opened in append mode, but - // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass - // in the expected offset to this constructor. - _fileStartOffset = fileStartOffset; - - // throw on failure - _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit); } template <typename Key, typename Value> @@ -1028,24 +1053,10 @@ void SortedFileWriter<Key, Value>::spill() { size = resultLen; } - // negative size means compressed + // Negative size means compressed. size = shouldCompress ? -size : size; - try { - _file.write(reinterpret_cast<const char*>(&size), sizeof(size)); - _file.write(outBuffer, std::abs(size)); - } catch (const std::system_error& ex) { - if (ex.code() == std::errc::no_space_on_device) { - msgasserted(ErrorCodes::OutOfDiskSpace, - str::stream() << ex.what() << ": " << _fileName); - } - msgasserted(5642403, - str::stream() << "error writing to file \"" << _fileName - << "\": " << sorter::myErrnoWithDescription()); - } catch (const std::exception&) { - msgasserted(16821, - str::stream() << "error writing to file \"" << _fileName - << "\": " << sorter::myErrnoWithDescription()); - } + _file->write(reinterpret_cast<const char*>(&size), sizeof(size)); + _file->write(outBuffer, std::abs(size)); _buffer.reset(); } @@ -1053,19 +1064,9 @@ void SortedFileWriter<Key, Value>::spill() { template <typename Key, typename Value> SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() { spill(); - std::streampos currentFileOffset = _file.tellp(); - uassert(50980, - str::stream() << "error fetching current file descriptor offset in file \"" << _fileName - << "\": " << sorter::myErrnoWithDescription(), - currentFileOffset >= 0); - - // In case nothing was written to disk, use _fileStartOffset because tellp() may not be - // initialized on all systems upon opening the file. - _fileEndOffset = currentFileOffset < _fileStartOffset ? _fileStartOffset : currentFileOffset; - _file.close(); return new sorter::FileIterator<Key, Value>( - _fileName, _fileStartOffset, _fileEndOffset, _settings, _checksum); + _file, _fileStartOffset, _file->currentOffset(), _settings, _checksum); } // @@ -1076,10 +1077,9 @@ template <typename Key, typename Value> template <typename Comparator> SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge( const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, - const std::string& fileName, const SortOptions& opts, const Comparator& comp) { - return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp); + return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp); } template <typename Key, typename Value> diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 8a43c8f91b7..aed12ffdfe2 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -31,6 +31,7 @@ #include <third_party/murmurhash3/MurmurHash3.h> +#include <boost/filesystem/path.hpp> #include <deque> #include <fstream> #include <memory> @@ -174,7 +175,6 @@ public: template <typename Comparator> static SortIteratorInterface* merge( const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, - const std::string& fileName, const SortOptions& opts, const Comparator& comp); @@ -213,30 +213,95 @@ public: typename Value::SorterDeserializeSettings> Settings; + /** + * Represents the file that a Sorter uses to spill to disk. Supports reading after writing (or + * reading without any writing), but does not support writing after any reading has been done. + */ + class File { + public: + File(std::string path) : _path(std::move(path)) { + invariant(!_path.empty()); + } + + ~File(); + + const boost::filesystem::path& path() const { + return _path; + } + + /** + * Signals that the on-disk file should not be cleaned up. + */ + void keep() { + _keep = true; + }; + + /** + * Reads the requested data from the file. Cannot write more to the file once this has been + * called. + */ + void read(std::streamoff offset, std::streamsize size, void* out); + + /** + * Writes the given data to the end of the file. Cannot be called after reading. + */ + void write(const char* data, std::streamsize size); + + /** + * Returns the current offset of the end of the file. Cannot be called after reading. + */ + std::streamoff currentOffset(); + + private: + void _open(); + + void _ensureOpenForWriting(); + + boost::filesystem::path _path; + std::fstream _file; + + // The current offset of the end of the file, or -1 if the file either has not yet been + // opened or is already being read. + std::streamoff _offset = -1; + + // Whether to keep the on-disk file even after this in-memory object has been destructed. + bool _keep = false; + }; + + explicit Sorter(const SortOptions& opts); + + /** + * ExtSort-only constructor. fileName is the base name of a file in the temp directory. + */ + Sorter(const SortOptions& opts, const std::string& fileName); + template <typename Comparator> static Sorter* make(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()); virtual void add(const Key&, const Value&) = 0; - /** * Cannot add more data after calling done(). - * - * The returned Iterator must not outlive the Sorter instance, which manages file clean up. */ virtual Iterator* done() = 0; virtual ~Sorter() {} bool usedDisk() const { - return _usedDisk; + return !_iters.empty(); } protected: Sorter() {} // can only be constructed as a base - bool _usedDisk{false}; // Keeps track of whether the sorter used disk or not + virtual void spill() = 0; + + SortOptions _opts; + + std::shared_ptr<File> _file; + + std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled. }; /** @@ -255,8 +320,7 @@ public: Settings; explicit SortedFileWriter(const SortOptions& opts, - const std::string& fileName, - const std::streampos fileStartOffset, + std::shared_ptr<typename Sorter<Key, Value>::File> file, const Settings& settings = Settings()); void addAlreadySorted(const Key&, const Value&); @@ -269,31 +333,20 @@ public: */ Iterator* done(); - /** - * Only call this after done() has been called to set the end offset. - */ - std::streampos getFileEndOffset() { - invariant(!_file.is_open()); - return _fileEndOffset; - } - private: void spill(); const Settings _settings; - std::string _fileName; - std::ofstream _file; + std::shared_ptr<typename Sorter<Key, Value>::File> _file; BufBuilder _buffer; // Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator // to ensure data has not been corrupted after reading from disk. uint32_t _checksum = 0; - // Tracks where in the file we started and finished writing the sorted data range so that the - // information can be given to the Iterator in done(), and to the user via getFileEndOffset() - // for the next SortedFileWriter instance using the same file. - std::streampos _fileStartOffset; - std::streampos _fileEndOffset; + // Tracks where in the file we started writing the sorted data range so that the information can + // be given to the Iterator in done(). + std::streamoff _fileStartOffset; }; } // namespace mongo @@ -317,7 +370,6 @@ private: template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \ SortIteratorInterface<Key, Value>::merge<Comparator>( \ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \ - const std::string& fileName, \ const SortOptions& opts, \ const Comparator& comp); \ template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \ diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index de0d4b3952c..bda75dc17af 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -234,7 +234,7 @@ std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N], std::vector<std::shared_ptr<IWIterator>> vec; for (int i = 0; i < N; i++) vec.push_back(std::shared_ptr<IWIterator>(array[i])); - return std::shared_ptr<IWIterator>(IWIterator::merge(vec, "", opts, IWComparator(Dir))); + return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir))); } // @@ -285,29 +285,28 @@ public: void run() { unittest::TempDir tempDir("sortedFileWriterTests"); const SortOptions opts = SortOptions().TempDir(tempDir.path()); + auto makeFile = [&] { + return std::make_shared<Sorter<IntWrapper, IntWrapper>::File>(opts.tempDir + "/" + + nextFileName()); + }; + { // small - std::string fileName = opts.tempDir + "/" + nextFileName(); - SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile()); sorter.addAlreadySorted(0, 0); sorter.addAlreadySorted(1, -1); sorter.addAlreadySorted(2, -2); sorter.addAlreadySorted(3, -3); sorter.addAlreadySorted(4, -4); ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), - make_shared<IntIterator>(0, 5)); - - ASSERT_TRUE(boost::filesystem::remove(fileName)); + std::make_shared<IntIterator>(0, 5)); } { // big - std::string fileName = opts.tempDir + "/" + nextFileName(); - SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile()); for (int i = 0; i < 10 * 1000 * 1000; i++) sorter.addAlreadySorted(i, -i); ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), - make_shared<IntIterator>(0, 10 * 1000 * 1000)); - - ASSERT_TRUE(boost::filesystem::remove(fileName)); + std::make_shared<IntIterator>(0, 10 * 1000 * 1000)); } ASSERT(boost::filesystem::is_empty(tempDir.path())); @@ -321,7 +320,7 @@ public: { // test empty (no inputs) std::vector<std::shared_ptr<IWIterator>> vec; std::shared_ptr<IWIterator> mergeIter( - IWIterator::merge(vec, "", SortOptions(), IWComparator())); + IWIterator::merge(vec, SortOptions(), IWComparator())); ASSERT_ITERATORS_EQUIVALENT(mergeIter, make_shared<EmptyIterator>()); } { // test empty (only empty inputs) |