diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2018-09-13 15:09:08 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2019-06-11 12:51:49 -0400 |
commit | c1c761dd865308a15ff75748bf111d0a3ce366d6 (patch) | |
tree | a385cae66549f4f55657d4ce06f999d981a6d0d5 /src | |
parent | 64d8e9e1b12d16b54d6a592bae8110226c491b4e (diff) | |
download | mongo-c1c761dd865308a15ff75748bf111d0a3ce366d6.tar.gz |
SERVER-17010 each Sorter instance spills to a single file rather than a new file per spill to disk
SERVER-38764 External sorter should use 64-bit integers for file offsets
(cherry picked from commit 48d999c08304b6ede2a9d1f9d9db974b59fe97e2)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/index/index_access_method.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.cpp | 337 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.h | 98 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter_test.cpp | 58 |
10 files changed, 482 insertions, 181 deletions
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 95d3a26f48b..ada7e27ab92 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -587,6 +587,20 @@ void IndexAccessMethod::getKeys(const BSONObj& obj, } } +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 indexAccessMethodFileCounter; + return "extsort-index." + std::to_string(indexAccessMethodFileCounter.fetchAndAdd(1)); +} + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index b02e8d0226f..1550efd0da3 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -44,6 +44,44 @@ REGISTER_DOCUMENT_SOURCE(bucketAuto, LiteParsedDocumentSourceDefault::parse, DocumentSourceBucketAuto::createFromBson); +namespace { + +boost::intrusive_ptr<Expression> parseGroupByExpression( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const BSONElement& groupByField, + const VariablesParseState& vps) { + if (groupByField.type() == BSONType::Object && + groupByField.embeddedObject().firstElementFieldName()[0] == '$') { + return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps); + } else if (groupByField.type() == BSONType::String && + groupByField.valueStringData()[0] == '$') { + return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps); + } else { + uasserted( + 40239, + str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed " + "path or an expression object, but found: " + << groupByField.toString(false, false)); + } +} + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 documentSourceBucketAutoFileCounter; + return "extsort-doc-bucket." + + std::to_string(documentSourceBucketAutoFileCounter.fetchAndAdd(1)); +} + +} // namespace + const char* DocumentSourceBucketAuto::getSourceName() const { return "$bucketAuto"; } @@ -405,28 +443,6 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto( } } -namespace { - -boost::intrusive_ptr<Expression> parseGroupByExpression( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const BSONElement& groupByField, - const VariablesParseState& vps) { - if (groupByField.type() == BSONType::Object && - groupByField.embeddedObject().firstElementFieldName()[0] == '$') { - return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps); - } else if (groupByField.type() == BSONType::String && - groupByField.valueStringData()[0] == '$') { - return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps); - } else { - uasserted( - 40239, - str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed " - "path or an expression object, but found: " - << groupByField.toString(false, false)); - } -} -} // namespace - intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(40240, @@ -496,6 +512,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( accumulationStatements, granularityRounder); } + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index a70a65c06b9..2324455c969 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -28,6 +28,8 @@ #include "mongo/platform/basic.h" +#include <boost/filesystem/operations.hpp> + #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/accumulator.h" @@ -39,9 +41,28 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/stdx/memory.h" +#include "mongo/util/destructor_guard.h" namespace mongo { +namespace { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 documentSourceGroupFileCounter; + return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1)); +} + +} // namespace + using boost::intrusive_ptr; using std::shared_ptr; using std::pair; @@ -276,7 +297,18 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), _spilled(false), - _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {} + _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) { + if (!pExpCtx->inRouter && (pExpCtx->extSortAllowed || kDebugBuild)) { + // We spill to disk in debug mode, regardless of extSortAllowed, to stress the system. + _fileName = pExpCtx->tempDir + "/" + nextFileName(); + } +} + +DocumentSourceGroup::~DocumentSourceGroup() { + if (_ownsFileDeletion) { + DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); + } +} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { vFieldName.push_back(accumulationStatement.fieldName); @@ -575,7 +607,11 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( - _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); + _sortedFiles, + _fileName, + SortOptions(), + SorterComparator(pExpCtx->getValueComparator()))); + _ownsFileDeletion = false; // prepare current to accumulate data _currentAccumulators.reserve(numAccumulators); @@ -608,7 +644,8 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator())); - SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir)); + SortedFileWriter<Value, Value> writer( + SortOptions().TempDir(pExpCtx->tempDir), _fileName, _nextSortedFileWriterOffset); switch (vpAccumulatorFactory.size()) { // same as ptrs[i]->second.size() for all i. case 0: // no values, essentially a distinct for (size_t i = 0; i < ptrs.size(); i++) { @@ -636,7 +673,9 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { _groups->clear(); - return shared_ptr<Sorter<Value, Value>::Iterator>(writer.done()); + Sorter<Value, Value>::Iterator* iteratorPtr = writer.done(); + _nextSortedFileWriterOffset = writer.getFileEndOffset(); + return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr); } boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const { @@ -878,7 +917,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::getMergeSource() { return pMerger; } -} + +} // namespace mongo #include "mongo/db/sorter/sorter.cpp" // Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index e79ce631817..270e44eb269 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -100,6 +100,8 @@ private: explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); + ~DocumentSourceGroup(); + /** * getNext() dispatches to one of these three depending on what type of $group it is. All three * of these methods expect '_currentAccumulators' to have been reset before being called, and @@ -161,6 +163,10 @@ private: size_t _memoryUsageBytes = 0; size_t _maxMemoryUsageBytes; std::unique_ptr<Variables> _variables; + std::string _fileName; + unsigned int _nextSortedFileWriterOffset = 0; + bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over. + std::vector<std::string> _idFieldNames; // used when id is a document std::vector<boost::intrusive_ptr<Expression>> _idExpressions; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 2e023c2ebc9..12153489b0a 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -221,7 +221,7 @@ protected: expressionContext->tempDir = _tempDir.path(); _group = DocumentSourceGroup::createFromBson(specElement, expressionContext); - assertRoundTrips(_group); + assertRoundTrips(_group, expressionContext); } DocumentSourceGroup* group() { return static_cast<DocumentSourceGroup*>(_group.get()); @@ -240,13 +240,14 @@ protected: private: /** Check that the group's spec round trips. */ - void assertRoundTrips(const intrusive_ptr<DocumentSource>& group) { + void assertRoundTrips(const intrusive_ptr<DocumentSource>& group, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { // We don't check against the spec that generated 'group' originally, because // $const operators may be introduced in the first serialization. BSONObj spec = toBson(group); BSONElement specElement = spec.firstElement(); intrusive_ptr<DocumentSource> generated = - DocumentSourceGroup::createFromBson(specElement, ctx()); + DocumentSourceGroup::createFromBson(specElement, expCtx); ASSERT_BSONOBJ_EQ(spec, toBson(generated)); } std::unique_ptr<QueryTestServiceContext> _queryServiceContext; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 6554a892016..d8298f6b02e 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -46,6 +46,25 @@ using std::make_pair; using std::string; using std::vector; +namespace { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 documentSourceSortFileCounter; + return "extsort-doc-source-sort." + + std::to_string(documentSourceSortFileCounter.fetchAndAdd(1)); +} + +} // namespace + DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx) : DocumentSource(pExpCtx), _mergingPresorted(false) {} @@ -294,6 +313,9 @@ public: return make_pair(_sorter->extractKey(doc), doc); } + void openSource() {} + void closeSource() {} + private: DocumentSourceSort* _sorter; DBClientCursor* _cursor; @@ -305,7 +327,7 @@ void DocumentSourceSort::populateFromCursors(const vector<DBClientCursor*>& curs iterators.push_back(std::make_shared<IteratorFromCursor>(this, cursors[i])); } - _output.reset(MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))); + _output.reset(MySorter::Iterator::merge(iterators, "", makeSortOptions(), Comparator(*this))); _populated = true; } @@ -373,7 +395,8 @@ intrusive_ptr<DocumentSource> DocumentSourceSort::getMergeSource() { other->_sort = _sort; return other; } -} + +} // namespace mongo #include "mongo/db/sorter/sorter.cpp" // Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index ed4ea7aefee..65989c859a5 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -45,6 +45,7 @@ #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/query/query_test_service_context.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/unittest/temp_dir.h" namespace mongo { bool isMongos() { @@ -94,6 +95,8 @@ public: AggregationRequest request(NamespaceString("a.collection"), rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(&_opCtx, request); + TempDir tempDir("PipelineTest"); + ctx->tempDir = tempDir.path(); // For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the // operations will be able to have a resolved view definition. @@ -918,6 +921,8 @@ public: AggregationRequest request(NamespaceString("a.collection"), rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(&_opCtx, request); + TempDir tempDir("PipelineTest"); + ctx->tempDir = tempDir.path(); // For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the // operations will be able to have a resolved view definition. diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index 0165e0adccc..02e1dc57313 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -67,6 +67,7 @@ #include "mongo/util/unowned_ptr.h" namespace mongo { + namespace sorter { using std::shared_ptr; @@ -120,19 +121,9 @@ void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) #endif } -/** Ensures a named file is deleted when this object goes out of scope */ -class FileDeleter { -public: - FileDeleter(const std::string& fileName) : _fileName(fileName) {} - ~FileDeleter() { - DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName);) - } - -private: - const std::string _fileName; -}; - -/** Returns results from sorted in-memory storage */ +/** + * Returns results from sorted in-memory storage. + */ template <typename Key, typename Value> class InMemIterator : public SortIteratorInterface<Key, Value> { public: @@ -148,6 +139,9 @@ public: template <typename Container> InMemIterator(const Container& input) : _data(input.begin(), input.end()) {} + void openSource() {} + void closeSource() {} + bool more() { return !_data.empty(); } @@ -161,7 +155,15 @@ private: std::deque<Data> _data; }; -/** Returns results in order from a single file */ +/** + * Returns results from a sorted range within a file. Each instance is given a file name and start + * and end offsets. + * + * This class is NOT responsible for file clean up / deletion. There are openSource() and + * closeSource() functions to ensure the FileIterator is not holding the file open when the file is + * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in + * use elsewhere. + */ template <typename Key, typename Value> class FileIterator : public SortIteratorInterface<Key, Value> { public: @@ -171,48 +173,78 @@ public: typedef std::pair<Key, Value> Data; FileIterator(const std::string& fileName, - const Settings& settings, - std::shared_ptr<FileDeleter> fileDeleter) + std::streampos fileStartOffset, + std::streampos fileEndOffset, + const Settings& settings) : _settings(settings), _done(false), _fileName(fileName), - _fileDeleter(fileDeleter), - _file(_fileName.c_str(), std::ios::in | std::ios::binary) { - massert(16814, + _fileStartOffset(fileStartOffset), + _fileEndOffset(fileEndOffset) { + uassert(16815, + str::stream() << "unexpected empty file: " << _fileName, + boost::filesystem::file_size(_fileName) != 0); + } + + void openSource() { + _file.open(_fileName.c_str(), std::ios::in | std::ios::binary); + uassert(16814, str::stream() << "error opening file \"" << _fileName << "\": " << myErrnoWithDescription(), _file.good()); + _file.seekg(_fileStartOffset); + uassert(50979, + str::stream() << "error seeking starting offset of '" << _fileStartOffset + << "' in file \"" + << _fileName + << "\": " + << myErrnoWithDescription(), + _file.good()); + } - massert(16815, - str::stream() << "unexpected empty file: " << _fileName, - boost::filesystem::file_size(_fileName) != 0); + void closeSource() { + _file.close(); + uassert(50969, + str::stream() << "error closing file \"" << _fileName << "\": " + << myErrnoWithDescription(), + !_file.fail()); } bool more() { if (!_done) - fillIfNeeded(); // may change _done + fillBufferIfNeeded(); // may change _done return !_done; } Data next() { verify(!_done); - fillIfNeeded(); - - // Note: key must be read before value so can't pass directly to Data constructor - auto first = Key::deserializeForSorter(*_reader, _settings.first); - auto second = Value::deserializeForSorter(*_reader, _settings.second); + fillBufferIfNeeded(); + + // Note: calling read() on the _bufferReader buffer in the deserialize function advances the + // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function + // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into + // the Data constructor + auto first = Key::deserializeForSorter(*_bufferReader, _settings.first); + auto second = Value::deserializeForSorter(*_bufferReader, _settings.second); return Data(std::move(first), std::move(second)); } private: - void fillIfNeeded() { + /** + * Attempts to refill the _bufferReader if it is empty. Expects _done to be false. + */ + void fillBufferIfNeeded() { verify(!_done); - if (!_reader || _reader->atEof()) - fill(); + if (!_bufferReader || _bufferReader->atEof()) + fillBufferFromDisk(); } - void fill() { + /** + * Tries to read from disk and places any results in _bufferReader. If there is no more data to + * read, then _done is set to true and the function returns immediately. + */ + void fillBufferFromDisk() { int32_t rawSize; read(&rawSize, sizeof(rawSize)); if (_done) @@ -224,7 +256,7 @@ private: _buffer.reset(new char[blockSize]); read(_buffer.get(), blockSize); - massert(16816, "file too short?", !_done); + uassert(16816, "file too short?", !_done); auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext()); if (encryptionHooks->enabled()) { @@ -236,7 +268,7 @@ private: reinterpret_cast<uint8_t*>(out.get()), blockSize, &outLen); - massert(28841, + uassert(28841, str::stream() << "Failed to unprotect data: " << status.toString(), status.isOK()); blockSize = outLen; @@ -244,70 +276,91 @@ private: } if (!compressed) { - _reader.reset(new BufReader(_buffer.get(), blockSize)); + _bufferReader.reset(new BufReader(_buffer.get(), blockSize)); return; } dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize)); size_t uncompressedSize; - massert(17061, + uassert(17061, "couldn't get uncompressed length", snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize)); std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]); - massert(17062, + uassert(17062, "decompression failed", snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get())); // hold on to decompressed data and throw out compressed data at block exit _buffer.swap(decompressionBuffer); - _reader.reset(new BufReader(_buffer.get(), uncompressedSize)); + _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize)); } - // sets _done to true on EOF - asserts on any other error + /** + * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset. + * + * Masserts on any file errors + */ void read(void* out, size_t size) { - _file.read(reinterpret_cast<char*>(out), size); - if (!_file.good()) { - if (_file.eof()) { - _done = true; - return; - } + invariant(_file.is_open()); - msgasserted(16817, - str::stream() << "error reading file \"" << _fileName << "\": " - << myErrnoWithDescription()); + const std::streampos offset = _file.tellg(); + uassert(51049, + str::stream() << "error reading file \"" << _fileName << "\": " + << myErrnoWithDescription(), + offset >= 0); + + if (offset >= _fileEndOffset) { + invariant(offset == _fileEndOffset); + _done = true; + return; } + + _file.read(reinterpret_cast<char*>(out), size); + uassert(16817, + str::stream() << "error reading file \"" << _fileName << "\": " + << myErrnoWithDescription(), + _file.good()); verify(_file.gcount() == static_cast<std::streamsize>(size)); } const Settings _settings; bool _done; std::unique_ptr<char[]> _buffer; - std::unique_ptr<BufReader> _reader; - std::string _fileName; - std::shared_ptr<FileDeleter> _fileDeleter; // Must outlive _file + std::unique_ptr<BufReader> _bufferReader; + std::string _fileName; // File containing the sorted data range. + std::streampos _fileStartOffset; // File offset at which the sorted data range starts. + std::streampos _fileEndOffset; // File offset at which the sorted data range ends. std::ifstream _file; }; -/** Merge-sorts results from 0 or more FileIterators */ +/** + * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted + * ranges within the same file. This class is given the data source file name upon construction and + * is responsible for deleting the data source file upon destruction. + */ template <typename Key, typename Value, typename Comparator> class MergeIterator : public SortIteratorInterface<Key, Value> { public: typedef SortIteratorInterface<Key, Value> Input; typedef std::pair<Key, Value> Data; - MergeIterator(const std::vector<std::shared_ptr<Input>>& iters, + const std::string& itersSourceFileName, const SortOptions& opts, const Comparator& comp) : _opts(opts), _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()), _first(true), - _greater(comp) { + _greater(comp), + _itersSourceFileName(itersSourceFileName) { for (size_t i = 0; i < iters.size(); i++) { + iters[i]->openSource(); if (iters[i]->more()) { _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i])); + } else { + iters[i]->closeSource(); } } @@ -322,16 +375,22 @@ public: _heap.pop_back(); } + ~MergeIterator() { + // Clear the remaining Stream objects first, to close the file handles before deleting the + // file. Some systems will error closing the file if any file handles are still open. + _current.reset(); + _heap.clear(); + DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName)); + } + + void openSource() {} + void closeSource() {} + bool more() { if (_remaining > 0 && (_first || !_heap.empty() || _current->more())) return true; - // We are done so clean up resources. - // Can't do this in next() due to lifetime guarantees of unowned Data. - _heap.clear(); - _current.reset(); _remaining = 0; - return false; } @@ -347,7 +406,6 @@ public: if (!_current->advance()) { verify(!_heap.empty()); - std::pop_heap(_heap.begin(), _heap.end(), _greater); _current = _heap.back(); _heap.pop_back(); @@ -362,11 +420,22 @@ public: private: - class Stream { // Data + Iterator + /** + * Data iterator over an Input stream. + * + * This class is responsible for closing the Input source upon destruction, unfortunately, + * because that is the path of least resistence to a design change requiring MergeIterator to + * handle eventual deletion of said Input source. + */ + class Stream { public: Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest) : fileNum(fileNum), _current(first), _rest(rest) {} + ~Stream() { + _rest->closeSource(); + } + const Data& current() const { return _current; } @@ -412,6 +481,7 @@ private: std::shared_ptr<Stream> _current; std::vector<std::shared_ptr<Stream>> _heap; // MinHeap STLComparator _greater; // named so calls make sense + std::string _itersSourceFileName; }; template <typename Key, typename Value, typename Comparator> @@ -428,9 +498,22 @@ public: const Settings& settings = Settings()) : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) { verify(_opts.limit == 0); + if (_opts.extSortAllowed) { + _fileName = _opts.tempDir + "/" + nextFileName(); + } + } + + ~NoLimitSorter() { + if (!_done) { + // If done() was never called to return a MergeIterator, then this Sorter still owns + // file deletion. + DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); + } } void add(const Key& key, const Value& val) { + invariant(!_done); + _data.push_back(std::make_pair(key, val)); _memUsed += key.memUsageForSorter(); @@ -441,21 +524,17 @@ public: } Iterator* done() { + invariant(!_done); + if (_iters.empty()) { sort(); return new InMemIterator<Key, Value>(_data); } spill(); - return Iterator::merge(_iters, _opts, _comp); - } - - // TEMP these are here for compatibility. Will be replaced with a general stats API - int numFiles() const { - return _iters.size(); - } - size_t memUsed() const { - return _memUsed; + Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp); + _done = true; + return mergeIt; } private: @@ -481,6 +560,8 @@ private: } void spill() { + invariant(!_done); + if (_data.empty()) return; @@ -499,12 +580,15 @@ private: sort(); - SortedFileWriter<Key, Value> writer(_opts, _settings); + SortedFileWriter<Key, Value> writer( + _opts, _fileName, _nextSortedFileWriterOffset, _settings); for (; !_data.empty(); _data.pop_front()) { writer.addAlreadySorted(_data.front().first, _data.front().second); } + Iterator* iteratorPtr = writer.done(); + _nextSortedFileWriterOffset = writer.getFileEndOffset(); - _iters.push_back(std::shared_ptr<Iterator>(writer.done())); + _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); _memUsed = 0; } @@ -512,6 +596,9 @@ private: const Comparator _comp; const Settings _settings; SortOptions _opts; + std::string _fileName; + std::streampos _nextSortedFileWriterOffset = 0; + bool _done = false; size_t _memUsed; std::deque<Data> _data; // the "current" data std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled @@ -552,14 +639,6 @@ public: } } - // TEMP these are here for compatibility. Will be replaced with a general stats API - int numFiles() const { - return 0; - } - size_t memUsed() const { - return _best.first.memUsageForSorter() + _best.second.memUsageForSorter(); - } - private: const Comparator _comp; Data _best; @@ -588,6 +667,10 @@ public: // This also *works* with limit==1 but LimitOneSorter should be used instead verify(_opts.limit > 1); + if (_opts.extSortAllowed) { + _fileName = _opts.tempDir + "/" + nextFileName(); + } + // Preallocate a fixed sized vector of the required size if we // don't expect it to have a major impact on our memory budget. // This is the common case with small limits. @@ -596,7 +679,17 @@ public: } } + ~TopKSorter() { + if (!_done) { + // If done() was never called to return a MergeIterator, then this Sorter still owns + // file deletion. + DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); + } + } + void add(const Key& key, const Value& val) { + invariant(!_done); + STLComparator less(_comp); Data contender(key, val); @@ -646,15 +739,9 @@ public: } spill(); - return Iterator::merge(_iters, _opts, _comp); - } - - // TEMP these are here for compatibility. Will be replaced with a general stats API - int numFiles() const { - return _iters.size(); - } - size_t memUsed() const { - return _memUsed; + Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp); + _done = true; + return iterator; } private: @@ -759,6 +846,8 @@ private: } void spill() { + invariant(!_done); + if (_data.empty()) return; @@ -781,7 +870,8 @@ private: sort(); updateCutoff(); - SortedFileWriter<Key, Value> writer(_opts, _settings); + SortedFileWriter<Key, Value> writer( + _opts, _fileName, _nextSortedFileWriterOffset, _settings); for (size_t i = 0; i < _data.size(); i++) { writer.addAlreadySorted(_data[i].first, _data[i].second); } @@ -789,7 +879,9 @@ private: // clear _data and release backing array's memory std::vector<Data>().swap(_data); - _iters.push_back(std::shared_ptr<Iterator>(writer.done())); + Iterator* iteratorPtr = writer.done(); + _nextSortedFileWriterOffset = writer.getFileEndOffset(); + _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); _memUsed = 0; } @@ -797,6 +889,9 @@ private: const Comparator _comp; const Settings _settings; SortOptions _opts; + std::string _fileName; + std::streampos _nextSortedFileWriterOffset = 0; + bool _done = false; size_t _memUsed; std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit. std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled @@ -810,46 +905,44 @@ private: size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far. }; -inline unsigned nextFileNumber() { - // This is unified across all Sorter types and instances. - static AtomicUInt32 fileCounter; - return fileCounter.fetchAndAdd(1); -} } // namespace sorter // // SortedFileWriter // - template <typename Key, typename Value> -SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, const Settings& settings) +SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, + const std::string& fileName, + const std::streampos fileStartOffset, + const Settings& settings) : _settings(settings) { namespace str = mongoutils::str; // This should be checked by consumers, but if we get here don't allow writes. - massert( + uassert( 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos()); - massert(17148, + uassert(17148, "Attempting to use external sort without setting SortOptions::tempDir", !opts.tempDir.empty()); - { - StringBuilder sb; - sb << opts.tempDir << "/extsort." << sorter::nextFileNumber(); - _fileName = sb.str(); - } - boost::filesystem::create_directories(opts.tempDir); - _file.open(_fileName.c_str(), std::ios::binary | std::ios::out); - massert(16818, + _fileName = fileName; + + // We open the provided file in append mode so that SortedFileWriter instances can share the + // same file, used serially. We want to share files in order to stay below system open file + // limits. + _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out); + uassert(16818, str::stream() << "error opening file \"" << _fileName << "\": " << sorter::myErrnoWithDescription(), _file.good()); - - _fileDeleter = std::make_shared<sorter::FileDeleter>(_fileName); + // The file descriptor is positioned at the end of a file when opened in append mode, but + // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass + // in the expected offset to this constructor. + _fileStartOffset = fileStartOffset; // throw on failure _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit); @@ -895,7 +988,7 @@ void SortedFileWriter<Key, Value>::spill() { reinterpret_cast<uint8_t*>(out.get()), protectedSizeMax, &resultLen); - massert(28842, + uassert(28842, str::stream() << "Failed to compress data: " << status.toString(), status.isOK()); outBuffer = out.get(); @@ -907,7 +1000,6 @@ void SortedFileWriter<Key, Value>::spill() { try { _file.write(reinterpret_cast<const char*>(&size), sizeof(size)); _file.write(outBuffer, std::abs(size)); - } catch (const std::exception&) { msgasserted(16821, str::stream() << "error writing to file \"" << _fileName << "\": " @@ -920,8 +1012,20 @@ void SortedFileWriter<Key, Value>::spill() { template <typename Key, typename Value> SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() { spill(); + std::streampos currentFileOffset = _file.tellp(); + uassert(50980, + str::stream() << "error fetching current file descriptor offset in file \"" << _fileName + << "\": " + << sorter::myErrnoWithDescription(), + currentFileOffset >= 0); + + // In case nothing was written to disk, use _fileStartOffset because tellp() may not be + // initialized on all systems upon opening the file. + _fileEndOffset = currentFileOffset < _fileStartOffset ? _fileStartOffset : currentFileOffset; _file.close(); - return new sorter::FileIterator<Key, Value>(_fileName, _settings, _fileDeleter); + + return new sorter::FileIterator<Key, Value>( + _fileName, _fileStartOffset, _fileEndOffset, _settings); } // @@ -932,9 +1036,10 @@ template <typename Key, typename Value> template <typename Comparator> SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge( const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, + const std::string& fileName, const SortOptions& opts, const Comparator& comp) { - return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp); + return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp); } template <typename Key, typename Value> @@ -943,11 +1048,11 @@ Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts, const Comparator& comp, const Settings& settings) { // This should be checked by consumers, but if it isn't try to fail early. - massert(16947, + uassert(16947, "Attempting to use external sort from mongos. This is not allowed.", !(isMongos() && opts.extSortAllowed)); - massert(17149, + uassert(17149, "Attempting to use external sort without setting SortOptions::tempDir", !(opts.extSortAllowed && opts.tempDir.empty())); diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 54f19dd0197..98affdf3152 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -84,24 +84,28 @@ */ namespace mongo { -namespace sorter { -// Everything in this namespace is internal to the sorter -class FileDeleter; -} /** * Runtime options that control the Sorter's behavior */ struct SortOptions { - unsigned long long limit; /// number of KV pairs to be returned. 0 for no limit. - size_t maxMemoryUsageBytes; /// Approximate. - bool extSortAllowed; /// If false, uassert if more mem needed than allowed. - std::string tempDir; /// Directory to directly place files in. - /// Must be explicitly set if extSortAllowed is true. + // The number of KV pairs to be returned. 0 indicates no limit. + unsigned long long limit; + + // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate. + size_t maxMemoryUsageBytes; + + // Whether we are allowed to spill to disk. If this is false and in-memory exceeds + // maxMemoryUsageBytes, we will uassert. + bool extSortAllowed; + + // Directory into which we place a file when spilling to disk. Must be explicitly set if + // extSortAllowed is true. + std::string tempDir; SortOptions() : limit(0), maxMemoryUsageBytes(64 * 1024 * 1024), extSortAllowed(false) {} - /// Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true) + // Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true) SortOptions& Limit(unsigned long long newLimit) { limit = newLimit; @@ -124,7 +128,9 @@ struct SortOptions { } }; -/// This is the output from the sorting framework +/** + * This is the sorted output iterator from the sorting framework. + */ template <typename Key, typename Value> class SortIteratorInterface { MONGO_DISALLOW_COPYING(SortIteratorInterface); @@ -139,18 +145,37 @@ public: virtual ~SortIteratorInterface() {} - /// Returns an iterator that merges the passed in iterators + // Returns an iterator that merges the passed in iterators template <typename Comparator> static SortIteratorInterface* merge( const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, + const std::string& fileName, const SortOptions& opts, const Comparator& comp); + // Opens and closes the source of data over which this class iterates, if applicable. + virtual void openSource() = 0; + virtual void closeSource() = 0; + protected: SortIteratorInterface() {} // can only be constructed as a base }; -/// This is the main way to input data to the sorting framework +/** + * This is the way to input data to the sorting framework. + * + * Each instance of this class will generate a file name and spill sorted data ranges to that file + * if allowed in its given Settings. If the instance destructs before done() is called, it will + * handle deleting the data file used for spills. Otherwise, if done() is called, responsibility for + * file deletion moves to the returned Iterator object, which must then delete the file upon its own + * destruction. + * + * All users of Sorter implementations must define their own nextFileName() function to generate + * unique file names for spills to disk. This is necessary because the sorter.cpp file is separately + * directly included in multiple places, rather than compiled in one place and linked, and so cannot + * itself provide a globally unique ID for file names. See existing function implementations of + * nextFileName() for example. + */ template <typename Key, typename Value> class Sorter { MONGO_DISALLOW_COPYING(Sorter); @@ -168,19 +193,24 @@ public: const Settings& settings = Settings()); virtual void add(const Key&, const Value&) = 0; - virtual Iterator* done() = 0; /// Can't add more data after calling done() - virtual ~Sorter() {} + /** + * Cannot add more data after calling done(). + * + * The returned Iterator must not outlive the Sorter instance, which manages file clean up. + */ + virtual Iterator* done() = 0; - // TEMP these are here for compatibility. Will be replaced with a general stats API - virtual int numFiles() const = 0; - virtual size_t memUsed() const = 0; + virtual ~Sorter() {} protected: Sorter() {} // can only be constructed as a base }; -/// Writes pre-sorted data to a sorted file and hands-back an Iterator over that file. +/** + * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file + * range. + */ template <typename Key, typename Value> class SortedFileWriter { MONGO_DISALLOW_COPYING(SortedFileWriter); @@ -191,19 +221,42 @@ public: typename Value::SorterDeserializeSettings> Settings; - explicit SortedFileWriter(const SortOptions& opts, const Settings& settings = Settings()); + explicit SortedFileWriter(const SortOptions& opts, + const std::string& fileName, + const std::streampos fileStartOffset, + const Settings& settings = Settings()); void addAlreadySorted(const Key&, const Value&); - Iterator* done(); /// Can't add more data after calling done() + + /** + * Spills any data remaining in the buffer to disk and then closes the file to which data was + * written. + * + * No more data can be added via addAlreadySorted() after calling done(). + */ + Iterator* done(); + + /** + * Only call this after done() has been called to set the end offset. + */ + std::streampos getFileEndOffset() { + invariant(!_file.is_open()); + return _fileEndOffset; + } private: void spill(); const Settings _settings; std::string _fileName; - std::shared_ptr<sorter::FileDeleter> _fileDeleter; // Must outlive _file std::ofstream _file; BufBuilder _buffer; + + // Tracks where in the file we started and finished writing the sorted data range so that the + // information can be given to the Iterator in done(), and to the user via getFileEndOffset() + // for the next SortedFileWriter instance using the same file. + std::streampos _fileStartOffset; + std::streampos _fileEndOffset; }; } @@ -227,6 +280,7 @@ private: template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \ SortIteratorInterface<Key, Value>::merge<Comparator>( \ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \ + const std::string& fileName, \ const SortOptions& opts, \ const Comparator& comp); \ template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \ diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index 19171fb13ba..b7a3b6932c6 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -44,10 +44,31 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" +#include <memory> + +namespace mongo { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 sorterTestFileCounter; + return "extsort-sorter-test." + std::to_string(sorterTestFileCounter.fetchAndAdd(1)); +} + +} // namespace mongo + // Need access to internal classes #include "mongo/db/sorter/sorter.cpp" namespace mongo { + using namespace mongo::sorter; using std::make_shared; using std::pair; @@ -118,6 +139,8 @@ class IntIterator : public IWIterator { public: IntIterator(int start = 0, int stop = INT_MAX, int increment = 1) : _current(start), _increment(increment), _stop(stop) {} + void openSource() {} + void closeSource() {} bool more() { if (_increment == 0) return true; @@ -139,6 +162,8 @@ private: class EmptyIterator : public IWIterator { public: + void openSource() {} + void closeSource() {} bool more() { return false; } @@ -154,6 +179,9 @@ public: verify(limit > 0); } + void openSource() {} + void closeSource() {} + bool more() { return _remaining && _source->more(); } @@ -172,6 +200,8 @@ template <typename It1, typename It2> void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) { int iteration; try { + it1->openSource(); + it2->openSource(); for (iteration = 0; true; iteration++) { ASSERT_EQUALS(it1->more(), it2->more()); ASSERT_EQUALS(it1->more(), it2->more()); // make sure more() is safe to call twice @@ -183,10 +213,13 @@ void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) { ASSERT_EQUALS(pair1.first, pair2.first); ASSERT_EQUALS(pair1.second, pair2.second); } - + it1->closeSource(); + it2->closeSource(); } catch (...) { mongo::unittest::log() << "Failure from line " << line << " on iteration " << iteration << std::endl; + it1->closeSource(); + it2->closeSource(); throw; } } @@ -204,10 +237,11 @@ template <typename IteratorPtr, int N> std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N], Direction Dir = ASC, const SortOptions& opts = SortOptions()) { + invariant(!opts.extSortAllowed); std::vector<std::shared_ptr<IWIterator>> vec; for (int i = 0; i < N; i++) vec.push_back(std::shared_ptr<IWIterator>(array[i])); - return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir))); + return std::shared_ptr<IWIterator>(IWIterator::merge(vec, "", opts, IWComparator(Dir))); } // @@ -234,6 +268,8 @@ public: class UnsortedIter : public IWIterator { public: UnsortedIter() : _pos(0) {} + void openSource() {} + void closeSource() {} bool more() { return _pos < sizeof(unsorted) / sizeof(unsorted[0]); } @@ -257,7 +293,8 @@ public: unittest::TempDir tempDir("sortedFileWriterTests"); const SortOptions opts = SortOptions().TempDir(tempDir.path()); { // small - SortedFileWriter<IntWrapper, IntWrapper> sorter(opts); + std::string fileName = opts.tempDir + "/" + nextFileName(); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0); sorter.addAlreadySorted(0, 0); sorter.addAlreadySorted(1, -1); sorter.addAlreadySorted(2, -2); @@ -265,14 +302,19 @@ public: sorter.addAlreadySorted(4, -4); ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), make_shared<IntIterator>(0, 5)); + + ASSERT_TRUE(boost::filesystem::remove(fileName)); } { // big - SortedFileWriter<IntWrapper, IntWrapper> sorter(opts); + std::string fileName = opts.tempDir + "/" + nextFileName(); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0); for (int i = 0; i < 10 * 1000 * 1000; i++) sorter.addAlreadySorted(i, -i); ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), make_shared<IntIterator>(0, 10 * 1000 * 1000)); + + ASSERT_TRUE(boost::filesystem::remove(fileName)); } ASSERT(boost::filesystem::is_empty(tempDir.path())); @@ -286,7 +328,7 @@ public: { // test empty (no inputs) std::vector<std::shared_ptr<IWIterator>> vec; std::shared_ptr<IWIterator> mergeIter( - IWIterator::merge(vec, SortOptions(), IWComparator())); + IWIterator::merge(vec, "", SortOptions(), IWComparator())); ASSERT_ITERATORS_EQUIVALENT(mergeIter, make_shared<EmptyIterator>()); } { // test empty (only empty inputs) @@ -498,12 +540,6 @@ public: void addData(unowned_ptr<IWSorter> sorter) { for (int i = 0; i < NUM_ITEMS; i++) sorter->add(_array[i], -_array[i]); - - if (typeid(*this) == typeid(LotsOfDataLittleMemory)) { - // don't do this check in subclasses since they may set a limit - ASSERT_GREATER_THAN_OR_EQUALS(static_cast<size_t>(sorter->numFiles()), - (NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT); - } } virtual std::shared_ptr<IWIterator> correct() { |