summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2021-09-23 14:40:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-23 14:56:17 +0000
commita9efdaf12d5f8a2c9ba9610d21fafdd0eeb8551e (patch)
tree8f2d986b64a571d317ee093bbc6602ddb9bc4f36
parent0289da9fb06c5af672bf4220176b735dcc910815 (diff)
downloadmongo-a9efdaf12d5f8a2c9ba9610d21fafdd0eeb8551e.tar.gz
SERVER-54791 Use single file descriptor for external sort
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp29
-rw-r--r--src/mongo/db/pipeline/document_source_group.h6
-rw-r--r--src/mongo/db/sorter/sorter.cpp404
-rw-r--r--src/mongo/db/sorter/sorter.h100
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp23
5 files changed, 298 insertions, 264 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index c7bf44e72e0..a82d90b9be3 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -29,7 +29,6 @@
#include "mongo/platform/basic.h"
-#include <boost/filesystem/operations.hpp>
#include <memory>
#include "mongo/db/exec/document_value/document.h"
@@ -377,20 +376,14 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_memoryTracker{pExpCtx->allowDiskUse && !pExpCtx->inMongos,
maxMemoryUsageBytes ? *maxMemoryUsageBytes
: internalDocumentSourceGroupMaxMemoryBytes.load()},
+ // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system.
+ _file(!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)
+ ? std::make_shared<Sorter<Value, Value>::File>(pExpCtx->tempDir + "/" +
+ nextFileName())
+ : nullptr),
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
- _spilled(false) {
- if (!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)) {
- // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system.
- _fileName = pExpCtx->tempDir + "/" + nextFileName();
- }
-}
-
-DocumentSourceGroup::~DocumentSourceGroup() {
- if (_ownsFileDeletion) {
- DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
- }
-}
+ _spilled(false) {}
void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
_accumulatedFields.push_back(accumulationStatement);
@@ -595,11 +588,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
_groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
_sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
- _sortedFiles,
- _fileName,
- SortOptions(),
- SorterComparator(pExpCtx->getValueComparator())));
- _ownsFileDeletion = false;
+ _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
// prepare current to accumulate data
_currentAccumulators.reserve(numAccumulators);
@@ -637,8 +626,7 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator()));
- SortedFileWriter<Value, Value> writer(
- SortOptions().TempDir(pExpCtx->tempDir), _fileName, _nextSortedFileWriterOffset);
+ SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file);
switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i.
case 0: // no values, essentially a distinct
for (size_t i = 0; i < ptrs.size(); i++) {
@@ -667,7 +655,6 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
_groups->clear();
Sorter<Value, Value>::Iterator* iteratorPtr = writer.done();
- _nextSortedFileWriterOffset = writer.getFileEndOffset();
return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr);
}
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index ec202833ead..ee8ad1f2f14 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -200,8 +200,6 @@ private:
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
boost::optional<size_t> maxMemoryUsageBytes = boost::none);
- ~DocumentSourceGroup();
-
/**
* getNext() dispatches to one of these three depending on what type of $group it is. These
* methods expect '_currentAccumulators' to have been reset before being called, and also expect
@@ -260,9 +258,7 @@ private:
MemoryUsageTracker _memoryTracker;
- std::string _fileName;
- std::streampos _nextSortedFileWriterOffset = 0;
- bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over.
+ std::shared_ptr<Sorter<Value, Value>::File> _file;
std::vector<std::string> _idFieldNames; // used when id is a document
std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 661b2effab0..39639153d7b 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -84,8 +84,6 @@ uint32_t addDataToChecksum(const void* startOfData, size_t sizeOfData, uint32_t
namespace sorter {
-using std::shared_ptr;
-
// We need to use the "real" errno everywhere, not GetLastError() on Windows
inline std::string myErrnoWithDescription() {
int errnoCopy = errno;
@@ -168,42 +166,21 @@ public:
Settings;
typedef std::pair<Key, Value> Data;
- FileIterator(const std::string& fileName,
- std::streampos fileStartOffset,
- std::streampos fileEndOffset,
+ FileIterator(std::shared_ptr<typename Sorter<Key, Value>::File> file,
+ std::streamoff fileStartOffset,
+ std::streamoff fileEndOffset,
const Settings& settings,
const uint32_t checksum)
: _settings(settings),
- _done(false),
- _fileName(fileName),
+ _file(std::move(file)),
_fileStartOffset(fileStartOffset),
+ _fileCurrentOffset(fileStartOffset),
_fileEndOffset(fileEndOffset),
- _originalChecksum(checksum) {
- uassert(16815,
- str::stream() << "unexpected empty file: " << _fileName,
- boost::filesystem::file_size(_fileName) != 0);
- }
+ _originalChecksum(checksum) {}
- void openSource() {
- _file.open(_fileName.c_str(), std::ios::in | std::ios::binary);
- uassert(16814,
- str::stream() << "error opening file \"" << _fileName
- << "\": " << myErrnoWithDescription(),
- _file.good());
- _file.seekg(_fileStartOffset);
- uassert(50979,
- str::stream() << "error seeking starting offset of '" << _fileStartOffset
- << "' in file \"" << _fileName << "\": " << myErrnoWithDescription(),
- _file.good());
- }
+ void openSource() {}
void closeSource() {
- _file.close();
- uassert(50969,
- str::stream() << "error closing file \"" << _fileName
- << "\": " << myErrnoWithDescription(),
- !_file.fail());
-
// If the file iterator reads through all data objects, we can ensure non-corrupt data
// by comparing the newly calculated checksum with the original checksum from the data
// written to disk. Some iterators do not read back all data from the file, which prohibits
@@ -219,13 +196,13 @@ public:
bool more() {
if (!_done)
- fillBufferIfNeeded(); // may change _done
+ _fillBufferIfNeeded(); // may change _done
return !_done;
}
Data next() {
- verify(!_done);
- fillBufferIfNeeded();
+ invariant(!_done);
+ _fillBufferIfNeeded();
const char* startOfNewData = static_cast<const char*>(_bufferReader->pos());
@@ -250,20 +227,20 @@ private:
/**
* Attempts to refill the _bufferReader if it is empty. Expects _done to be false.
*/
- void fillBufferIfNeeded() {
- verify(!_done);
+ void _fillBufferIfNeeded() {
+ invariant(!_done);
if (!_bufferReader || _bufferReader->atEof())
- fillBufferFromDisk();
+ _fillBufferFromDisk();
}
/**
* Tries to read from disk and places any results in _bufferReader. If there is no more data to
* read, then _done is set to true and the function returns immediately.
*/
- void fillBufferFromDisk() {
+ void _fillBufferFromDisk() {
int32_t rawSize;
- read(&rawSize, sizeof(rawSize));
+ _read(&rawSize, sizeof(rawSize));
if (_done)
return;
@@ -272,7 +249,7 @@ private:
int32_t blockSize = std::abs(rawSize);
_buffer.reset(new char[blockSize]);
- read(_buffer.get(), blockSize);
+ _read(_buffer.get(), blockSize);
uassert(16816, "file too short?", !_done);
auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
@@ -316,41 +293,31 @@ private:
/**
* Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset.
- *
- * Masserts on any file errors
*/
- void read(void* out, size_t size) {
- invariant(_file.is_open());
-
- const std::streampos offset = _file.tellg();
- uassert(51049,
- str::stream() << "error reading file \"" << _fileName
- << "\": " << myErrnoWithDescription(),
- offset >= 0);
-
- if (offset >= _fileEndOffset) {
- invariant(offset == _fileEndOffset);
+ void _read(void* out, size_t size) {
+ if (_fileCurrentOffset == _fileEndOffset) {
_done = true;
return;
}
- _file.read(reinterpret_cast<char*>(out), size);
- uassert(16817,
- str::stream() << "error reading file \"" << _fileName
- << "\": " << myErrnoWithDescription(),
- _file.good());
- verify(_file.gcount() == static_cast<std::streamsize>(size));
+ invariant(_fileCurrentOffset < _fileEndOffset,
+ str::stream() << "Current file offset (" << _fileCurrentOffset
+ << ") greater than end offset (" << _fileEndOffset << ")");
+
+ _file->read(_fileCurrentOffset, size, out);
+ _fileCurrentOffset += size;
}
const Settings _settings;
- bool _done;
+ bool _done = false;
std::unique_ptr<char[]> _buffer;
std::unique_ptr<BufReader> _bufferReader;
- std::string _fileName; // File containing the sorted data range.
- std::streampos _fileStartOffset; // File offset at which the sorted data range starts.
- std::streampos _fileEndOffset; // File offset at which the sorted data range ends.
- std::ifstream _file;
+ std::shared_ptr<typename Sorter<Key, Value>::File>
+ _file; // File containing the sorted data range.
+ std::streamoff _fileStartOffset; // File offset at which the sorted data range starts.
+ std::streamoff _fileCurrentOffset; // File offset at which we are currently reading from.
+ std::streamoff _fileEndOffset; // File offset at which the sorted data range ends.
// Checksum value that is updated with each read of a data object from disk. We can compare
// this value with _originalChecksum to check for data corruption if and only if the
@@ -375,14 +342,12 @@ public:
typedef std::pair<Key, Value> Data;
MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
- const std::string& itersSourceFileName,
const SortOptions& opts,
const Comparator& comp)
: _opts(opts),
_remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
_first(true),
- _greater(comp),
- _itersSourceFileName(itersSourceFileName) {
+ _greater(comp) {
for (size_t i = 0; i < iters.size(); i++) {
iters[i]->openSource();
if (iters[i]->more()) {
@@ -404,11 +369,8 @@ public:
}
~MergeIterator() {
- // Clear the remaining Stream objects first, to close the file handles before deleting the
- // file. Some systems will error closing the file if any file handles are still open.
_current.reset();
_heap.clear();
- DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName));
}
void openSource() {}
@@ -509,7 +471,6 @@ private:
std::shared_ptr<Stream> _current;
std::vector<std::shared_ptr<Stream>> _heap; // MinHeap
STLComparator _greater; // named so calls make sense
- std::string _itersSourceFileName;
};
template <typename Key, typename Value, typename Comparator>
@@ -524,19 +485,8 @@ public:
NoLimitSorter(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) {
- verify(_opts.limit == 0);
- if (_opts.extSortAllowed) {
- _fileName = _opts.tempDir + "/" + nextFileName();
- }
- }
-
- ~NoLimitSorter() {
- if (!_done) {
- // If done() was never called to return a MergeIterator, then this Sorter still owns
- // file deletion.
- DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
- }
+ : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) {
+ invariant(opts.limit == 0);
}
void add(const Key& key, const Value& val) {
@@ -547,22 +497,20 @@ public:
_memUsed += key.memUsageForSorter();
_memUsed += val.memUsageForSorter();
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
}
Iterator* done() {
- invariant(!_done);
+ invariant(!std::exchange(_done, true));
- if (_iters.empty()) {
+ if (this->_iters.empty()) {
sort();
return new InMemIterator<Key, Value>(_data);
}
spill();
- Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp);
- _done = true;
- return mergeIt;
+ return Iterator::merge(this->_iters, this->_opts, _comp);
}
private:
@@ -581,53 +529,41 @@ private:
void sort() {
STLComparator less(_comp);
std::stable_sort(_data.begin(), _data.end(), less);
-
- // Does 2x more compares than stable_sort
- // TODO test on windows
- // std::sort(_data.begin(), _data.end(), comp);
}
void spill() {
- invariant(!_done);
-
- this->_usedDisk = true;
if (_data.empty())
return;
- if (!_opts.extSortAllowed) {
+ if (!this->_opts.extSortAllowed) {
// This error message only applies to sorts from user queries made through the find or
// aggregation commands. Other clients, such as bulk index builds, should suppress this
// error, either by allowing external sorting or by catching and throwing a more
// appropriate error.
uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
- << " bytes, but did not opt in to external sorting.");
+ str::stream()
+ << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes
+ << " bytes, but did not opt in to external sorting.");
}
sort();
- SortedFileWriter<Key, Value> writer(
- _opts, _fileName, _nextSortedFileWriterOffset, _settings);
+ SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings);
for (; !_data.empty(); _data.pop_front()) {
writer.addAlreadySorted(_data.front().first, _data.front().second);
}
Iterator* iteratorPtr = writer.done();
- _nextSortedFileWriterOffset = writer.getFileEndOffset();
- _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
+ this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
}
const Comparator _comp;
const Settings _settings;
- SortOptions _opts;
- std::string _fileName;
- std::streampos _nextSortedFileWriterOffset = 0;
bool _done = false;
- size_t _memUsed;
- std::deque<Data> _data; // the "current" data
- std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
+ size_t _memUsed = 0;
+ std::deque<Data> _data; // Data that has not been spilled.
};
template <typename Key, typename Value, typename Comparator>
@@ -666,6 +602,10 @@ public:
}
private:
+ void spill() {
+ invariant(false, "LimitOneSorter does not spill to disk");
+ }
+
const Comparator _comp;
Data _best;
bool _haveData; // false at start, set to true on first call to add()
@@ -683,19 +623,15 @@ public:
TopKSorter(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : _comp(comp),
+ : Sorter<Key, Value>(opts),
+ _comp(comp),
_settings(settings),
- _opts(opts),
_memUsed(0),
_haveCutoff(false),
_worstCount(0),
_medianCount(0) {
// This also *works* with limit==1 but LimitOneSorter should be used instead
- verify(_opts.limit > 1);
-
- if (_opts.extSortAllowed) {
- _fileName = _opts.tempDir + "/" + nextFileName();
- }
+ invariant(opts.limit > 1);
// Preallocate a fixed sized vector of the required size if we don't expect it to have a
// major impact on our memory budget. This is the common case with small limits.
@@ -706,21 +642,13 @@ public:
}
}
- ~TopKSorter() {
- if (!_done) {
- // If done() was never called to return a MergeIterator, then this Sorter still owns
- // file deletion.
- DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
- }
- }
-
void add(const Key& key, const Value& val) {
invariant(!_done);
STLComparator less(_comp);
Data contender(key, val);
- if (_data.size() < _opts.limit) {
+ if (_data.size() < this->_opts.limit) {
if (_haveCutoff && !less(contender, _cutoff))
return;
@@ -729,16 +657,16 @@ public:
_memUsed += key.memUsageForSorter();
_memUsed += val.memUsageForSorter();
- if (_data.size() == _opts.limit)
+ if (_data.size() == this->_opts.limit)
std::make_heap(_data.begin(), _data.end(), less);
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
return;
}
- verify(_data.size() == _opts.limit);
+ invariant(_data.size() == this->_opts.limit);
if (!less(contender, _data.front()))
return; // not good enough
@@ -755,18 +683,18 @@ public:
_data.back() = {contender.first.getOwned(), contender.second.getOwned()};
std::push_heap(_data.begin(), _data.end(), less);
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
}
Iterator* done() {
- if (_iters.empty()) {
+ if (this->_iters.empty()) {
sort();
return new InMemIterator<Key, Value>(_data);
}
spill();
- Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp);
+ Iterator* iterator = Iterator::merge(this->_iters, this->_opts, _comp);
_done = true;
return iterator;
}
@@ -787,7 +715,7 @@ private:
void sort() {
STLComparator less(_comp);
- if (_data.size() == _opts.limit) {
+ if (_data.size() == this->_opts.limit) {
std::sort_heap(_data.begin(), _data.end(), less);
} else {
std::stable_sort(_data.begin(), _data.end(), less);
@@ -856,14 +784,14 @@ private:
// Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
- if (_worstCount >= _opts.limit) {
+ if (_worstCount >= this->_opts.limit) {
if (!_haveCutoff || less(_worstSeen, _cutoff)) {
_cutoff = _worstSeen;
_haveCutoff = true;
}
_worstCount = 0;
}
- if (_medianCount >= _opts.limit) {
+ if (_medianCount >= this->_opts.limit) {
if (!_haveCutoff || less(_lastMedian, _cutoff)) {
_cutoff = _lastMedian;
_haveCutoff = true;
@@ -875,17 +803,16 @@ private:
void spill() {
invariant(!_done);
- this->_usedDisk = true;
if (_data.empty())
return;
- if (!_opts.extSortAllowed) {
+ if (!this->_opts.extSortAllowed) {
// This error message only applies to sorts from user queries made through the find or
// aggregation commands. Other clients should suppress this error, either by allowing
// external sorting or by catching and throwing a more appropriate error.
uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
str::stream()
- << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
+ << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes
<< " bytes, but did not opt in to external sorting. Aborting operation."
<< " Pass allowDiskUse:true to opt in.");
}
@@ -896,8 +823,7 @@ private:
sort();
updateCutoff();
- SortedFileWriter<Key, Value> writer(
- _opts, _fileName, _nextSortedFileWriterOffset, _settings);
+ SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings);
for (size_t i = 0; i < _data.size(); i++) {
writer.addAlreadySorted(_data[i].first, _data[i].second);
}
@@ -906,21 +832,18 @@ private:
std::vector<Data>().swap(_data);
Iterator* iteratorPtr = writer.done();
- _nextSortedFileWriterOffset = writer.getFileEndOffset();
- _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
+ this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
}
const Comparator _comp;
const Settings _settings;
- SortOptions _opts;
- std::string _fileName;
- std::streampos _nextSortedFileWriterOffset = 0;
bool _done = false;
size_t _memUsed;
- std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit.
- std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
+
+ // Data that has not been spilled. Organized as max-heap if size == limit.
+ std::vector<Data> _data;
// See updateCutoff() for a full description of how these members are used.
bool _haveCutoff;
@@ -933,17 +856,139 @@ private:
} // namespace sorter
+template <typename Key, typename Value>
+Sorter<Key, Value>::Sorter(const SortOptions& opts)
+ : _opts(opts),
+ _file(opts.extSortAllowed
+ ? std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + nextFileName())
+ : nullptr) {}
+
+template <typename Key, typename Value>
+Sorter<Key, Value>::Sorter(const SortOptions& opts, const std::string& fileName)
+ : _opts(opts),
+ _file(std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + fileName)) {
+ invariant(opts.extSortAllowed);
+ invariant(!opts.tempDir.empty());
+ invariant(!fileName.empty());
+}
+
+template <typename Key, typename Value>
+Sorter<Key, Value>::File::~File() {
+ if (_keep) {
+ return;
+ }
+
+ if (_file.is_open()) {
+ DESTRUCTOR_GUARD(_file.exceptions(std::ios::failbit));
+ DESTRUCTOR_GUARD(_file.close());
+ }
+
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_path));
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::read(std::streamoff offset, std::streamsize size, void* out) {
+ if (!_file.is_open()) {
+ _open();
+ }
+
+ if (_offset != -1) {
+ _file.exceptions(std::ios::goodbit);
+ _file.flush();
+ _offset = -1;
+
+ uassert(5479100,
+ str::stream() << "Error flushing file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file);
+ }
+
+ _file.seekg(offset);
+ _file.read(reinterpret_cast<char*>(out), size);
+
+ uassert(16817,
+ str::stream() << "Error reading file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file);
+
+ invariant(_file.gcount() == size,
+ str::stream() << "Number of bytes read (" << _file.gcount()
+ << ") not equal to expected number (" << size << ")");
+
+ uassert(51049,
+ str::stream() << "Error reading file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file.tellg() >= 0);
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::write(const char* data, std::streamsize size) {
+ _ensureOpenForWriting();
+
+ try {
+ _file.write(data, size);
+ _offset += size;
+ } catch (const std::system_error& ex) {
+ if (ex.code() == std::errc::no_space_on_device) {
+ uasserted(ErrorCodes::OutOfDiskSpace,
+ str::stream() << ex.what() << ": " << _path.string());
+ }
+ uasserted(5642403,
+ str::stream() << "Error writing to file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription());
+ } catch (const std::exception&) {
+ uasserted(16821,
+ str::stream() << "Error writing to file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription());
+ }
+}
+
+template <typename Key, typename Value>
+std::streamoff Sorter<Key, Value>::File::currentOffset() {
+ _ensureOpenForWriting();
+ return _offset;
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::_open() {
+ invariant(!_file.is_open());
+
+ boost::filesystem::create_directories(_path.parent_path());
+
+ // We open the provided file in append mode so that SortedFileWriter instances can share
+ // the same file, used serially. We want to share files in order to stay below system
+ // open file limits.
+ _file.open(_path.string(), std::ios::app | std::ios::binary | std::ios::in | std::ios::out);
+
+ uassert(16818,
+ str::stream() << "Error opening file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file.good());
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::_ensureOpenForWriting() {
+ invariant(_offset != -1 || !_file.is_open());
+
+ if (_file.is_open()) {
+ return;
+ }
+
+ _open();
+ _file.exceptions(std::ios::failbit | std::ios::badbit);
+ _offset = boost::filesystem::file_size(_path);
+}
+
//
// SortedFileWriter
//
template <typename Key, typename Value>
-SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
- const std::string& fileName,
- const std::streampos fileStartOffset,
- const Settings& settings)
- : _settings(settings) {
-
+SortedFileWriter<Key, Value>::SortedFileWriter(
+ const SortOptions& opts,
+ std::shared_ptr<typename Sorter<Key, Value>::File> file,
+ const Settings& settings)
+ : _settings(settings), _file(std::move(file)), _fileStartOffset(_file->currentOffset()) {
// This should be checked by consumers, but if we get here don't allow writes.
uassert(
16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
@@ -951,26 +996,6 @@ SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
uassert(17148,
"Attempting to use external sort without setting SortOptions::tempDir",
!opts.tempDir.empty());
-
- boost::filesystem::create_directories(opts.tempDir);
-
- _fileName = fileName;
-
- // We open the provided file in append mode so that SortedFileWriter instances can share the
- // same file, used serially. We want to share files in order to stay below system open file
- // limits.
- _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out);
- uassert(16818,
- str::stream() << "error opening file \"" << _fileName
- << "\": " << sorter::myErrnoWithDescription(),
- _file.good());
- // The file descriptor is positioned at the end of a file when opened in append mode, but
- // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass
- // in the expected offset to this constructor.
- _fileStartOffset = fileStartOffset;
-
- // throw on failure
- _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
}
template <typename Key, typename Value>
@@ -1028,24 +1053,10 @@ void SortedFileWriter<Key, Value>::spill() {
size = resultLen;
}
- // negative size means compressed
+ // Negative size means compressed.
size = shouldCompress ? -size : size;
- try {
- _file.write(reinterpret_cast<const char*>(&size), sizeof(size));
- _file.write(outBuffer, std::abs(size));
- } catch (const std::system_error& ex) {
- if (ex.code() == std::errc::no_space_on_device) {
- msgasserted(ErrorCodes::OutOfDiskSpace,
- str::stream() << ex.what() << ": " << _fileName);
- }
- msgasserted(5642403,
- str::stream() << "error writing to file \"" << _fileName
- << "\": " << sorter::myErrnoWithDescription());
- } catch (const std::exception&) {
- msgasserted(16821,
- str::stream() << "error writing to file \"" << _fileName
- << "\": " << sorter::myErrnoWithDescription());
- }
+ _file->write(reinterpret_cast<const char*>(&size), sizeof(size));
+ _file->write(outBuffer, std::abs(size));
_buffer.reset();
}
@@ -1053,19 +1064,9 @@ void SortedFileWriter<Key, Value>::spill() {
template <typename Key, typename Value>
SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
spill();
- std::streampos currentFileOffset = _file.tellp();
- uassert(50980,
- str::stream() << "error fetching current file descriptor offset in file \"" << _fileName
- << "\": " << sorter::myErrnoWithDescription(),
- currentFileOffset >= 0);
-
- // In case nothing was written to disk, use _fileStartOffset because tellp() may not be
- // initialized on all systems upon opening the file.
- _fileEndOffset = currentFileOffset < _fileStartOffset ? _fileStartOffset : currentFileOffset;
- _file.close();
return new sorter::FileIterator<Key, Value>(
- _fileName, _fileStartOffset, _fileEndOffset, _settings, _checksum);
+ _file, _fileStartOffset, _file->currentOffset(), _settings, _checksum);
}
//
@@ -1076,10 +1077,9 @@ template <typename Key, typename Value>
template <typename Comparator>
SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
- const std::string& fileName,
const SortOptions& opts,
const Comparator& comp) {
- return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp);
+ return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp);
}
template <typename Key, typename Value>
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 8a43c8f91b7..aed12ffdfe2 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -31,6 +31,7 @@
#include <third_party/murmurhash3/MurmurHash3.h>
+#include <boost/filesystem/path.hpp>
#include <deque>
#include <fstream>
#include <memory>
@@ -174,7 +175,6 @@ public:
template <typename Comparator>
static SortIteratorInterface* merge(
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
- const std::string& fileName,
const SortOptions& opts,
const Comparator& comp);
@@ -213,30 +213,95 @@ public:
typename Value::SorterDeserializeSettings>
Settings;
+ /**
+ * Represents the file that a Sorter uses to spill to disk. Supports reading after writing (or
+ * reading without any writing), but does not support writing after any reading has been done.
+ */
+ class File {
+ public:
+ File(std::string path) : _path(std::move(path)) {
+ invariant(!_path.empty());
+ }
+
+ ~File();
+
+ const boost::filesystem::path& path() const {
+ return _path;
+ }
+
+ /**
+ * Signals that the on-disk file should not be cleaned up.
+ */
+ void keep() {
+ _keep = true;
+ };
+
+ /**
+ * Reads the requested data from the file. Cannot write more to the file once this has been
+ * called.
+ */
+ void read(std::streamoff offset, std::streamsize size, void* out);
+
+ /**
+ * Writes the given data to the end of the file. Cannot be called after reading.
+ */
+ void write(const char* data, std::streamsize size);
+
+ /**
+ * Returns the current offset of the end of the file. Cannot be called after reading.
+ */
+ std::streamoff currentOffset();
+
+ private:
+ void _open();
+
+ void _ensureOpenForWriting();
+
+ boost::filesystem::path _path;
+ std::fstream _file;
+
+ // The current offset of the end of the file, or -1 if the file either has not yet been
+ // opened or is already being read.
+ std::streamoff _offset = -1;
+
+ // Whether to keep the on-disk file even after this in-memory object has been destructed.
+ bool _keep = false;
+ };
+
+ explicit Sorter(const SortOptions& opts);
+
+ /**
+ * ExtSort-only constructor. fileName is the base name of a file in the temp directory.
+ */
+ Sorter(const SortOptions& opts, const std::string& fileName);
+
template <typename Comparator>
static Sorter* make(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings());
virtual void add(const Key&, const Value&) = 0;
-
/**
* Cannot add more data after calling done().
- *
- * The returned Iterator must not outlive the Sorter instance, which manages file clean up.
*/
virtual Iterator* done() = 0;
virtual ~Sorter() {}
bool usedDisk() const {
- return _usedDisk;
+ return !_iters.empty();
}
protected:
Sorter() {} // can only be constructed as a base
- bool _usedDisk{false}; // Keeps track of whether the sorter used disk or not
+ virtual void spill() = 0;
+
+ SortOptions _opts;
+
+ std::shared_ptr<File> _file;
+
+ std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled.
};
/**
@@ -255,8 +320,7 @@ public:
Settings;
explicit SortedFileWriter(const SortOptions& opts,
- const std::string& fileName,
- const std::streampos fileStartOffset,
+ std::shared_ptr<typename Sorter<Key, Value>::File> file,
const Settings& settings = Settings());
void addAlreadySorted(const Key&, const Value&);
@@ -269,31 +333,20 @@ public:
*/
Iterator* done();
- /**
- * Only call this after done() has been called to set the end offset.
- */
- std::streampos getFileEndOffset() {
- invariant(!_file.is_open());
- return _fileEndOffset;
- }
-
private:
void spill();
const Settings _settings;
- std::string _fileName;
- std::ofstream _file;
+ std::shared_ptr<typename Sorter<Key, Value>::File> _file;
BufBuilder _buffer;
// Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator
// to ensure data has not been corrupted after reading from disk.
uint32_t _checksum = 0;
- // Tracks where in the file we started and finished writing the sorted data range so that the
- // information can be given to the Iterator in done(), and to the user via getFileEndOffset()
- // for the next SortedFileWriter instance using the same file.
- std::streampos _fileStartOffset;
- std::streampos _fileEndOffset;
+ // Tracks where in the file we started writing the sorted data range so that the information can
+ // be given to the Iterator in done().
+ std::streamoff _fileStartOffset;
};
} // namespace mongo
@@ -317,7 +370,6 @@ private:
template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \
SortIteratorInterface<Key, Value>::merge<Comparator>( \
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \
- const std::string& fileName, \
const SortOptions& opts, \
const Comparator& comp); \
template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index de0d4b3952c..bda75dc17af 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -234,7 +234,7 @@ std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N],
std::vector<std::shared_ptr<IWIterator>> vec;
for (int i = 0; i < N; i++)
vec.push_back(std::shared_ptr<IWIterator>(array[i]));
- return std::shared_ptr<IWIterator>(IWIterator::merge(vec, "", opts, IWComparator(Dir)));
+ return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir)));
}
//
@@ -285,29 +285,28 @@ public:
void run() {
unittest::TempDir tempDir("sortedFileWriterTests");
const SortOptions opts = SortOptions().TempDir(tempDir.path());
+ auto makeFile = [&] {
+ return std::make_shared<Sorter<IntWrapper, IntWrapper>::File>(opts.tempDir + "/" +
+ nextFileName());
+ };
+
{ // small
- std::string fileName = opts.tempDir + "/" + nextFileName();
- SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0);
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile());
sorter.addAlreadySorted(0, 0);
sorter.addAlreadySorted(1, -1);
sorter.addAlreadySorted(2, -2);
sorter.addAlreadySorted(3, -3);
sorter.addAlreadySorted(4, -4);
ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
- make_shared<IntIterator>(0, 5));
-
- ASSERT_TRUE(boost::filesystem::remove(fileName));
+ std::make_shared<IntIterator>(0, 5));
}
{ // big
- std::string fileName = opts.tempDir + "/" + nextFileName();
- SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0);
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile());
for (int i = 0; i < 10 * 1000 * 1000; i++)
sorter.addAlreadySorted(i, -i);
ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
- make_shared<IntIterator>(0, 10 * 1000 * 1000));
-
- ASSERT_TRUE(boost::filesystem::remove(fileName));
+ std::make_shared<IntIterator>(0, 10 * 1000 * 1000));
}
ASSERT(boost::filesystem::is_empty(tempDir.path()));
@@ -321,7 +320,7 @@ public:
{ // test empty (no inputs)
std::vector<std::shared_ptr<IWIterator>> vec;
std::shared_ptr<IWIterator> mergeIter(
- IWIterator::merge(vec, "", SortOptions(), IWComparator()));
+ IWIterator::merge(vec, SortOptions(), IWComparator()));
ASSERT_ITERATORS_EQUIVALENT(mergeIter, make_shared<EmptyIterator>());
}
{ // test empty (only empty inputs)