diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2018-09-13 15:09:08 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2018-10-31 08:42:19 -0400 |
commit | 2be7f2677a40a863f336d2964f456c9d87ddc838 (patch) | |
tree | 968b48c9cb77e7669cf129d3592e7461bd2ed7df /src | |
parent | 236c6c28a18210586673097ee436c5b613b6c46f (diff) | |
download | mongo-2be7f2677a40a863f336d2964f456c9d87ddc838.tar.gz |
SERVER-17010 each Sorter instance spills to a single file rather than a new file per spill to disk
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/index/index_access_method.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_context_fixture.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.cpp | 333 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.h | 97 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter_test.cpp | 60 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner_test.cpp | 1 |
12 files changed, 474 insertions, 179 deletions
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 4641d6c865d..1d13f159870 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -752,6 +752,20 @@ SortedDataInterface* AbstractIndexAccessMethod::getSortedDataInterface_forTest() return _newInterface.get(); } +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 indexAccessMethodFileCounter; + return "extsort-index." + std::to_string(indexAccessMethodFileCounter.fetchAndAdd(1)); +} + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h index 009c10b42b0..aeacb8ec71e 100644 --- a/src/mongo/db/pipeline/aggregation_context_fixture.h +++ b/src/mongo/db/pipeline/aggregation_context_fixture.h @@ -36,6 +36,7 @@ #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -54,6 +55,8 @@ public: // context. _expCtx = new ExpressionContext(_opCtx.get(), nullptr); _expCtx->ns = std::move(nss); + unittest::TempDir tempDir("AggregationContextFixture"); + _expCtx->tempDir = tempDir.path(); } boost::intrusive_ptr<ExpressionContext> getExpCtx() { diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 8ffe222e559..7251d3cc2ef 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -46,6 +46,44 @@ REGISTER_DOCUMENT_SOURCE(bucketAuto, LiteParsedDocumentSourceDefault::parse, DocumentSourceBucketAuto::createFromBson); +namespace { + +boost::intrusive_ptr<Expression> parseGroupByExpression( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const BSONElement& groupByField, + const VariablesParseState& vps) { + if (groupByField.type() == BSONType::Object && + groupByField.embeddedObject().firstElementFieldName()[0] == '$') { + return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps); + } else if (groupByField.type() == BSONType::String && + groupByField.valueStringData()[0] == '$') { + return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps); + } else { + uasserted( + 40239, + str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed " + "path or an expression object, but found: " + << groupByField.toString(false, false)); + } +} + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 documentSourceBucketAutoFileCounter; + return "extsort-doc-bucket." + + std::to_string(documentSourceBucketAutoFileCounter.fetchAndAdd(1)); +} + +} // namespace + const char* DocumentSourceBucketAuto::getSourceName() const { return "$bucketAuto"; } @@ -400,28 +438,6 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto( } } -namespace { - -boost::intrusive_ptr<Expression> parseGroupByExpression( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const BSONElement& groupByField, - const VariablesParseState& vps) { - if (groupByField.type() == BSONType::Object && - groupByField.embeddedObject().firstElementFieldName()[0] == '$') { - return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps); - } else if (groupByField.type() == BSONType::String && - groupByField.valueStringData()[0] == '$') { - return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps); - } else { - uasserted( - 40239, - str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed " - "path or an expression object, but found: " - << groupByField.toString(false, false)); - } -} -} // namespace - intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(40240, @@ -486,6 +502,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( return DocumentSourceBucketAuto::create( pExpCtx, groupByExpression, numBuckets.get(), accumulationStatements, granularityRounder); } + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 1866eb06058..440c6c10ee9 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -30,6 +30,8 @@ #include "mongo/platform/basic.h" +#include <boost/filesystem/operations.hpp> + #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/accumulator.h" @@ -41,9 +43,28 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/stdx/memory.h" +#include "mongo/util/destructor_guard.h" namespace mongo { +namespace { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 documentSourceGroupFileCounter; + return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1)); +} + +} // namespace + using boost::intrusive_ptr; using std::pair; using std::shared_ptr; @@ -345,7 +366,18 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), _spilled(false), - _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {} + _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) { + if (!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)) { + // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system. + _fileName = pExpCtx->tempDir + "/" + nextFileName(); + } +} + +DocumentSourceGroup::~DocumentSourceGroup() { + if (_ownsFileDeletion) { + DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); + } +} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { _accumulatedFields.push_back(accumulationStatement); @@ -637,7 +669,11 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( - _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); + _sortedFiles, + _fileName, + SortOptions(), + SorterComparator(pExpCtx->getValueComparator()))); + _ownsFileDeletion = false; // prepare current to accumulate data _currentAccumulators.reserve(numAccumulators); @@ -675,7 +711,8 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator())); - SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir)); + SortedFileWriter<Value, Value> writer( + SortOptions().TempDir(pExpCtx->tempDir), _fileName, _nextSortedFileWriterOffset); switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i. case 0: // no values, essentially a distinct for (size_t i = 0; i < ptrs.size(); i++) { @@ -703,7 +740,9 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { _groups->clear(); - return shared_ptr<Sorter<Value, Value>::Iterator>(writer.done()); + Sorter<Value, Value>::Iterator* iteratorPtr = writer.done(); + _nextSortedFileWriterOffset = writer.getFileEndOffset(); + return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr); } boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const { @@ -1015,6 +1054,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields)); } + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 9df59c10329..a1d6fdad059 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -187,6 +187,8 @@ private: explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, boost::optional<size_t> maxMemoryUsageBytes = boost::none); + ~DocumentSourceGroup(); + /** * getNext() dispatches to one of these three depending on what type of $group it is. All three * of these methods expect '_currentAccumulators' to have been reset before being called, and @@ -245,6 +247,10 @@ private: bool _doingMerge; size_t _memoryUsageBytes = 0; size_t _maxMemoryUsageBytes; + std::string _fileName; + unsigned int _nextSortedFileWriterOffset = 0; + bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over. + std::vector<std::string> _idFieldNames; // used when id is a document std::vector<boost::intrusive_ptr<Expression>> _idExpressions; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 091fc09880c..bde6dfdeb32 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -258,7 +258,7 @@ protected: expressionContext->tempDir = _tempDir.path(); _group = DocumentSourceGroup::createFromBson(specElement, expressionContext); - assertRoundTrips(_group); + assertRoundTrips(_group, expressionContext); } DocumentSourceGroup* group() { return static_cast<DocumentSourceGroup*>(_group.get()); @@ -277,13 +277,14 @@ protected: private: /** Check that the group's spec round trips. */ - void assertRoundTrips(const intrusive_ptr<DocumentSource>& group) { + void assertRoundTrips(const intrusive_ptr<DocumentSource>& group, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { // We don't check against the spec that generated 'group' originally, because // $const operators may be introduced in the first serialization. BSONObj spec = toBson(group); BSONElement specElement = spec.firstElement(); intrusive_ptr<DocumentSource> generated = - DocumentSourceGroup::createFromBson(specElement, ctx()); + DocumentSourceGroup::createFromBson(specElement, expCtx); ASSERT_BSONOBJ_EQ(spec, toBson(generated)); } std::unique_ptr<QueryTestServiceContext> _queryServiceContext; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index ffe17f9280c..c825a7b9165 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -52,6 +52,7 @@ using std::unique_ptr; using std::vector; namespace { + Value missingToNull(Value maybeMissing) { return maybeMissing.missing() ? Value(BSONNULL) : maybeMissing; } @@ -96,7 +97,23 @@ Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) { return Value{std::move(keys)}; } +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 documentSourceSortFileCounter; + return "extsort-doc-source-sort." + + std::to_string(documentSourceSortFileCounter.fetchAndAdd(1)); +} + } // namespace + constexpr StringData DocumentSourceSort::kStageName; DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx) @@ -526,6 +543,7 @@ bool DocumentSourceSort::canRunInParallelBeforeOut( // would generally require merging the streams before producing output. return false; } + } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 917fd6cd862..733008f4c1b 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -59,6 +59,7 @@ #include "mongo/dbtests/dbtests.h" #include "mongo/s/query/cluster_aggregation_planner.h" #include "mongo/unittest/death_test.h" +#include "mongo/unittest/temp_dir.h" namespace mongo { namespace { @@ -101,6 +102,8 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, AggregationRequest request(kTestNss, rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(opCtx.get(), request); + TempDir tempDir("PipelineTest"); + ctx->tempDir = tempDir.path(); // For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the // operations will be able to have a resolved view definition. @@ -1752,6 +1755,8 @@ public: AggregationRequest request(kTestNss, rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(&_opCtx, request); + TempDir tempDir("PipelineTest"); + ctx->tempDir = tempDir.path(); // For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the // operations will be able to have a resolved view definition. diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index 0f44776d2b7..497a4fa8ed1 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -68,6 +68,7 @@ #include "mongo/util/unowned_ptr.h" namespace mongo { + namespace sorter { using std::shared_ptr; @@ -104,19 +105,9 @@ void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) #endif } -/** Ensures a named file is deleted when this object goes out of scope */ -class FileDeleter { -public: - FileDeleter(const std::string& fileName) : _fileName(fileName) {} - ~FileDeleter() { - DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName);) - } - -private: - const std::string _fileName; -}; - -/** Returns results from sorted in-memory storage */ +/** + * Returns results from sorted in-memory storage. + */ template <typename Key, typename Value> class InMemIterator : public SortIteratorInterface<Key, Value> { public: @@ -132,6 +123,9 @@ public: template <typename Container> InMemIterator(const Container& input) : _data(input.begin(), input.end()) {} + void openSource() {} + void closeSource() {} + bool more() { return !_data.empty(); } @@ -145,7 +139,15 @@ private: std::deque<Data> _data; }; -/** Returns results in order from a single file */ +/** + * Returns results from a sorted range within a file. Each instance is given a file name and start + * and end offsets. + * + * This class is NOT responsible for file clean up / deletion. There are openSource() and + * closeSource() functions to ensure the FileIterator is not holding the file open when the file is + * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in + * use elsewhere. + */ template <typename Key, typename Value> class FileIterator : public SortIteratorInterface<Key, Value> { public: @@ -155,48 +157,78 @@ public: typedef std::pair<Key, Value> Data; FileIterator(const std::string& fileName, - const Settings& settings, - std::shared_ptr<FileDeleter> fileDeleter) + unsigned int fileStartOffset, + unsigned int fileEndOffset, + const Settings& settings) : _settings(settings), _done(false), _fileName(fileName), - _fileDeleter(fileDeleter), - _file(_fileName.c_str(), std::ios::in | std::ios::binary) { - massert(16814, + _fileStartOffset(fileStartOffset), + _fileEndOffset(fileEndOffset) { + uassert(16815, + str::stream() << "unexpected empty file: " << _fileName, + boost::filesystem::file_size(_fileName) != 0); + } + + void openSource() { + _file.open(_fileName.c_str(), std::ios::in | std::ios::binary); + uassert(16814, str::stream() << "error opening file \"" << _fileName << "\": " << myErrnoWithDescription(), _file.good()); + _file.seekg(_fileStartOffset); + uassert(50979, + str::stream() << "error seeking starting offset of '" << _fileStartOffset + << "' in file \"" + << _fileName + << "\": " + << myErrnoWithDescription(), + _file.good()); + } - massert(16815, - str::stream() << "unexpected empty file: " << _fileName, - boost::filesystem::file_size(_fileName) != 0); + void closeSource() { + _file.close(); + uassert(50969, + str::stream() << "error closing file \"" << _fileName << "\": " + << myErrnoWithDescription(), + !_file.fail()); } bool more() { if (!_done) - fillIfNeeded(); // may change _done + fillBufferIfNeeded(); // may change _done return !_done; } Data next() { verify(!_done); - fillIfNeeded(); - - // Note: key must be read before value so can't pass directly to Data constructor - auto first = Key::deserializeForSorter(*_reader, _settings.first); - auto second = Value::deserializeForSorter(*_reader, _settings.second); + fillBufferIfNeeded(); + + // Note: calling read() on the _bufferReader buffer in the deserialize function advances the + // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function + // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into + // the Data constructor + auto first = Key::deserializeForSorter(*_bufferReader, _settings.first); + auto second = Value::deserializeForSorter(*_bufferReader, _settings.second); return Data(std::move(first), std::move(second)); } private: - void fillIfNeeded() { + /** + * Attempts to refill the _bufferReader if it is empty. Expects _done to be false. + */ + void fillBufferIfNeeded() { verify(!_done); - if (!_reader || _reader->atEof()) - fill(); + if (!_bufferReader || _bufferReader->atEof()) + fillBufferFromDisk(); } - void fill() { + /** + * Tries to read from disk and places any results in _bufferReader. If there is no more data to + * read, then _done is set to true and the function returns immediately. + */ + void fillBufferFromDisk() { int32_t rawSize; read(&rawSize, sizeof(rawSize)); if (_done) @@ -208,7 +240,7 @@ private: _buffer.reset(new char[blockSize]); read(_buffer.get(), blockSize); - massert(16816, "file too short?", !_done); + uassert(16816, "file too short?", !_done); auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext()); if (encryptionHooks->enabled()) { @@ -220,7 +252,7 @@ private: reinterpret_cast<uint8_t*>(out.get()), blockSize, &outLen); - massert(28841, + uassert(28841, str::stream() << "Failed to unprotect data: " << status.toString(), status.isOK()); blockSize = outLen; @@ -228,70 +260,85 @@ private: } if (!compressed) { - _reader.reset(new BufReader(_buffer.get(), blockSize)); + _bufferReader.reset(new BufReader(_buffer.get(), blockSize)); return; } dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize)); size_t uncompressedSize; - massert(17061, + uassert(17061, "couldn't get uncompressed length", snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize)); std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]); - massert(17062, + uassert(17062, "decompression failed", snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get())); // hold on to decompressed data and throw out compressed data at block exit _buffer.swap(decompressionBuffer); - _reader.reset(new BufReader(_buffer.get(), uncompressedSize)); + _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize)); } - // sets _done to true on EOF - asserts on any other error + /** + * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset. + * + * Masserts on any file errors + */ void read(void* out, size_t size) { - _file.read(reinterpret_cast<char*>(out), size); - if (!_file.good()) { - if (_file.eof()) { - _done = true; - return; - } + invariant(_file.is_open()); - msgasserted(16817, - str::stream() << "error reading file \"" << _fileName << "\": " - << myErrnoWithDescription()); + if (static_cast<unsigned int>(_file.tellg()) >= _fileEndOffset) { + invariant(static_cast<unsigned int>(_file.tellg()) == _fileEndOffset); + _done = true; + return; } + + _file.read(reinterpret_cast<char*>(out), size); + uassert(16817, + str::stream() << "error reading file \"" << _fileName << "\": " + << myErrnoWithDescription(), + _file.good()); verify(_file.gcount() == static_cast<std::streamsize>(size)); } const Settings _settings; bool _done; std::unique_ptr<char[]> _buffer; - std::unique_ptr<BufReader> _reader; - std::string _fileName; - std::shared_ptr<FileDeleter> _fileDeleter; // Must outlive _file + std::unique_ptr<BufReader> _bufferReader; + std::string _fileName; // File containing the sorted data range. + unsigned int _fileStartOffset; // File offset at which the sorted data range starts. + unsigned int _fileEndOffset; // File offset at which the sorted data range ends. std::ifstream _file; }; -/** Merge-sorts results from 0 or more FileIterators */ +/** + * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted + * ranges within the same file. This class is given the data source file name upon construction and + * is responsible for deleting the data source file upon destruction. + */ template <typename Key, typename Value, typename Comparator> class MergeIterator : public SortIteratorInterface<Key, Value> { public: typedef SortIteratorInterface<Key, Value> Input; typedef std::pair<Key, Value> Data; - MergeIterator(const std::vector<std::shared_ptr<Input>>& iters, + const std::string& itersSourceFileName, const SortOptions& opts, const Comparator& comp) : _opts(opts), _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()), _first(true), - _greater(comp) { + _greater(comp), + _itersSourceFileName(itersSourceFileName) { for (size_t i = 0; i < iters.size(); i++) { + iters[i]->openSource(); if (iters[i]->more()) { _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i])); + } else { + iters[i]->closeSource(); } } @@ -306,16 +353,22 @@ public: _heap.pop_back(); } + ~MergeIterator() { + // Clear the remaining Stream objects first, to close the file handles before deleting the + // file. Some systems will error closing the file if any file handles are still open. + _current.reset(); + _heap.clear(); + DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName)); + } + + void openSource() {} + void closeSource() {} + bool more() { if (_remaining > 0 && (_first || !_heap.empty() || _current->more())) return true; - // We are done so clean up resources. - // Can't do this in next() due to lifetime guarantees of unowned Data. - _heap.clear(); - _current.reset(); _remaining = 0; - return false; } @@ -331,7 +384,6 @@ public: if (!_current->advance()) { verify(!_heap.empty()); - std::pop_heap(_heap.begin(), _heap.end(), _greater); _current = _heap.back(); _heap.pop_back(); @@ -346,11 +398,22 @@ public: private: - class Stream { // Data + Iterator + /** + * Data iterator over an Input stream. + * + * This class is responsible for closing the Input source upon destruction, unfortunately, + * because that is the path of least resistence to a design change requiring MergeIterator to + * handle eventual deletion of said Input source. + */ + class Stream { public: Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest) : fileNum(fileNum), _current(first), _rest(rest) {} + ~Stream() { + _rest->closeSource(); + } + const Data& current() const { return _current; } @@ -396,6 +459,7 @@ private: std::shared_ptr<Stream> _current; std::vector<std::shared_ptr<Stream>> _heap; // MinHeap STLComparator _greater; // named so calls make sense + std::string _itersSourceFileName; }; template <typename Key, typename Value, typename Comparator> @@ -412,9 +476,22 @@ public: const Settings& settings = Settings()) : _comp(comp), _settings(settings), _opts(opts), _memUsed(0) { verify(_opts.limit == 0); + if (_opts.extSortAllowed) { + _fileName = _opts.tempDir + "/" + nextFileName(); + } + } + + ~NoLimitSorter() { + if (!_done) { + // If done() was never called to return a MergeIterator, then this Sorter still owns + // file deletion. + DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); + } } void add(const Key& key, const Value& val) { + invariant(!_done); + _data.push_back(std::make_pair(key, val)); _memUsed += key.memUsageForSorter(); @@ -425,21 +502,17 @@ public: } Iterator* done() { + invariant(!_done); + if (_iters.empty()) { sort(); return new InMemIterator<Key, Value>(_data); } spill(); - return Iterator::merge(_iters, _opts, _comp); - } - - // TEMP these are here for compatibility. Will be replaced with a general stats API - int numFiles() const { - return _iters.size(); - } - size_t memUsed() const { - return _memUsed; + Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp); + _done = true; + return mergeIt; } private: @@ -465,6 +538,8 @@ private: } void spill() { + invariant(!_done); + this->_usedDisk = true; if (_data.empty()) return; @@ -484,12 +559,15 @@ private: sort(); - SortedFileWriter<Key, Value> writer(_opts, _settings); + SortedFileWriter<Key, Value> writer( + _opts, _fileName, _nextSortedFileWriterOffset, _settings); for (; !_data.empty(); _data.pop_front()) { writer.addAlreadySorted(_data.front().first, _data.front().second); } + Iterator* iteratorPtr = writer.done(); + _nextSortedFileWriterOffset = writer.getFileEndOffset(); - _iters.push_back(std::shared_ptr<Iterator>(writer.done())); + _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); _memUsed = 0; } @@ -497,6 +575,9 @@ private: const Comparator _comp; const Settings _settings; SortOptions _opts; + std::string _fileName; + unsigned int _nextSortedFileWriterOffset = 0; + bool _done = false; size_t _memUsed; std::deque<Data> _data; // the "current" data std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled @@ -537,14 +618,6 @@ public: } } - // TEMP these are here for compatibility. Will be replaced with a general stats API - int numFiles() const { - return 0; - } - size_t memUsed() const { - return _best.first.memUsageForSorter() + _best.second.memUsageForSorter(); - } - private: const Comparator _comp; Data _best; @@ -573,6 +646,10 @@ public: // This also *works* with limit==1 but LimitOneSorter should be used instead verify(_opts.limit > 1); + if (_opts.extSortAllowed) { + _fileName = _opts.tempDir + "/" + nextFileName(); + } + // Preallocate a fixed sized vector of the required size if we // don't expect it to have a major impact on our memory budget. // This is the common case with small limits. @@ -581,7 +658,17 @@ public: } } + ~TopKSorter() { + if (!_done) { + // If done() was never called to return a MergeIterator, then this Sorter still owns + // file deletion. + DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName)); + } + } + void add(const Key& key, const Value& val) { + invariant(!_done); + STLComparator less(_comp); Data contender(key, val); @@ -631,15 +718,9 @@ public: } spill(); - return Iterator::merge(_iters, _opts, _comp); - } - - // TEMP these are here for compatibility. Will be replaced with a general stats API - int numFiles() const { - return _iters.size(); - } - size_t memUsed() const { - return _memUsed; + Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp); + _done = true; + return iterator; } private: @@ -744,6 +825,8 @@ private: } void spill() { + invariant(!_done); + this->_usedDisk = true; if (_data.empty()) return; @@ -767,7 +850,8 @@ private: sort(); updateCutoff(); - SortedFileWriter<Key, Value> writer(_opts, _settings); + SortedFileWriter<Key, Value> writer( + _opts, _fileName, _nextSortedFileWriterOffset, _settings); for (size_t i = 0; i < _data.size(); i++) { writer.addAlreadySorted(_data[i].first, _data[i].second); } @@ -775,7 +859,9 @@ private: // clear _data and release backing array's memory std::vector<Data>().swap(_data); - _iters.push_back(std::shared_ptr<Iterator>(writer.done())); + Iterator* iteratorPtr = writer.done(); + _nextSortedFileWriterOffset = writer.getFileEndOffset(); + _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); _memUsed = 0; } @@ -783,6 +869,9 @@ private: const Comparator _comp; const Settings _settings; SortOptions _opts; + std::string _fileName; + unsigned int _nextSortedFileWriterOffset = 0; + bool _done = false; size_t _memUsed; std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit. std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled @@ -796,46 +885,44 @@ private: size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far. }; -inline unsigned nextFileNumber() { - // This is unified across all Sorter types and instances. - static AtomicUInt32 fileCounter; - return fileCounter.fetchAndAdd(1); -} } // namespace sorter // // SortedFileWriter // - template <typename Key, typename Value> -SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, const Settings& settings) +SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, + const std::string& fileName, + const unsigned int fileStartOffset, + const Settings& settings) : _settings(settings) { namespace str = mongoutils::str; // This should be checked by consumers, but if we get here don't allow writes. - massert( + uassert( 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos()); - massert(17148, + uassert(17148, "Attempting to use external sort without setting SortOptions::tempDir", !opts.tempDir.empty()); - { - StringBuilder sb; - sb << opts.tempDir << "/extsort." << sorter::nextFileNumber(); - _fileName = sb.str(); - } - boost::filesystem::create_directories(opts.tempDir); - _file.open(_fileName.c_str(), std::ios::binary | std::ios::out); - massert(16818, + _fileName = fileName; + + // We open the provided file in append mode so that SortedFileWriter instances can share the + // same file, used serially. We want to share files in order to stay below system open file + // limits. + _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out); + uassert(16818, str::stream() << "error opening file \"" << _fileName << "\": " << sorter::myErrnoWithDescription(), _file.good()); - - _fileDeleter = std::make_shared<sorter::FileDeleter>(_fileName); + // The file descriptor is positioned at the end of a file when opened in append mode, but + // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass + // in the expected offset to this constructor. + _fileStartOffset = fileStartOffset; // throw on failure _file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit); @@ -881,7 +968,7 @@ void SortedFileWriter<Key, Value>::spill() { reinterpret_cast<uint8_t*>(out.get()), protectedSizeMax, &resultLen); - massert(28842, + uassert(28842, str::stream() << "Failed to compress data: " << status.toString(), status.isOK()); outBuffer = out.get(); @@ -893,7 +980,6 @@ void SortedFileWriter<Key, Value>::spill() { try { _file.write(reinterpret_cast<const char*>(&size), sizeof(size)); _file.write(outBuffer, std::abs(size)); - } catch (const std::exception&) { msgasserted(16821, str::stream() << "error writing to file \"" << _fileName << "\": " @@ -906,8 +992,22 @@ void SortedFileWriter<Key, Value>::spill() { template <typename Key, typename Value> SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() { spill(); + long currentFileOffset = _file.tellp(); + uassert(50980, + str::stream() << "error fetching current file descriptor offset in file \"" << _fileName + << "\": " + << sorter::myErrnoWithDescription(), + currentFileOffset >= 0); + + // In case nothing was written to disk, use _fileStartOffset because tellp() may not be + // initialized on all systems upon opening the file. + _fileEndOffset = static_cast<unsigned int>(currentFileOffset) < _fileStartOffset + ? _fileStartOffset + : static_cast<unsigned int>(currentFileOffset); _file.close(); - return new sorter::FileIterator<Key, Value>(_fileName, _settings, _fileDeleter); + + return new sorter::FileIterator<Key, Value>( + _fileName, _fileStartOffset, _fileEndOffset, _settings); } // @@ -918,9 +1018,10 @@ template <typename Key, typename Value> template <typename Comparator> SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge( const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, + const std::string& fileName, const SortOptions& opts, const Comparator& comp) { - return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp); + return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp); } template <typename Key, typename Value> @@ -929,11 +1030,11 @@ Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts, const Comparator& comp, const Settings& settings) { // This should be checked by consumers, but if it isn't try to fail early. - massert(16947, + uassert(16947, "Attempting to use external sort from mongos. This is not allowed.", !(isMongos() && opts.extSortAllowed)); - massert(17149, + uassert(17149, "Attempting to use external sort without setting SortOptions::tempDir", !(opts.extSortAllowed && opts.tempDir.empty())); switch (opts.limit) { diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 2a4dbe3db32..9a45f1349f8 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -86,24 +86,28 @@ */ namespace mongo { -namespace sorter { -// Everything in this namespace is internal to the sorter -class FileDeleter; -} /** * Runtime options that control the Sorter's behavior */ struct SortOptions { - unsigned long long limit; /// number of KV pairs to be returned. 0 for no limit. - size_t maxMemoryUsageBytes; /// Approximate. - bool extSortAllowed; /// If false, uassert if more mem needed than allowed. - std::string tempDir; /// Directory to directly place files in. - /// Must be explicitly set if extSortAllowed is true. + // The number of KV pairs to be returned. 0 indicates no limit. + unsigned long long limit; + + // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate. + size_t maxMemoryUsageBytes; + + // Whether we are allowed to spill to disk. If this is false and in-memory exceeds + // maxMemoryUsageBytes, we will uassert. + bool extSortAllowed; + + // Directory into which we place a file when spilling to disk. Must be explicitly set if + // extSortAllowed is true. + std::string tempDir; SortOptions() : limit(0), maxMemoryUsageBytes(64 * 1024 * 1024), extSortAllowed(false) {} - /// Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true) + // Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true) SortOptions& Limit(unsigned long long newLimit) { limit = newLimit; @@ -126,7 +130,9 @@ struct SortOptions { } }; -/// This is the output from the sorting framework +/** + * This is the sorted output iterator from the sorting framework. + */ template <typename Key, typename Value> class SortIteratorInterface { MONGO_DISALLOW_COPYING(SortIteratorInterface); @@ -141,18 +147,37 @@ public: virtual ~SortIteratorInterface() {} - /// Returns an iterator that merges the passed in iterators + // Returns an iterator that merges the passed in iterators template <typename Comparator> static SortIteratorInterface* merge( const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, + const std::string& fileName, const SortOptions& opts, const Comparator& comp); + // Opens and closes the source of data over which this class iterates, if applicable. + virtual void openSource() = 0; + virtual void closeSource() = 0; + protected: SortIteratorInterface() {} // can only be constructed as a base }; -/// This is the main way to input data to the sorting framework +/** + * This is the way to input data to the sorting framework. + * + * Each instance of this class will generate a file name and spill sorted data ranges to that file + * if allowed in its given Settings. If the instance destructs before done() is called, it will + * handle deleting the data file used for spills. Otherwise, if done() is called, responsibility for + * file deletion moves to the returned Iterator object, which must then delete the file upon its own + * destruction. + * + * All users of Sorter implementations must define their own nextFileName() function to generate + * unique file names for spills to disk. This is necessary because the sorter.cpp file is separately + * directly included in multiple places, rather than compiled in one place and linked, and so cannot + * itself provide a globally unique ID for file names. See existing function implementations of + * nextFileName() for example. + */ template <typename Key, typename Value> class Sorter { MONGO_DISALLOW_COPYING(Sorter); @@ -170,23 +195,29 @@ public: const Settings& settings = Settings()); virtual void add(const Key&, const Value&) = 0; - virtual Iterator* done() = 0; /// Can't add more data after calling done() + + /** + * Cannot add more data after calling done(). + * + * The returned Iterator must not outlive the Sorter instance, which manages file clean up. + */ + virtual Iterator* done() = 0; virtual ~Sorter() {} - // TEMP these are here for compatibility. Will be replaced with a general stats API bool usedDisk() { return _usedDisk; } - virtual int numFiles() const = 0; - virtual size_t memUsed() const = 0; protected: bool _usedDisk{false}; // Keeps track of whether the sorter used disk or not Sorter() {} // can only be constructed as a base }; -/// Writes pre-sorted data to a sorted file and hands-back an Iterator over that file. +/** + * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file + * range. + */ template <typename Key, typename Value> class SortedFileWriter { MONGO_DISALLOW_COPYING(SortedFileWriter); @@ -197,19 +228,42 @@ public: typename Value::SorterDeserializeSettings> Settings; - explicit SortedFileWriter(const SortOptions& opts, const Settings& settings = Settings()); + explicit SortedFileWriter(const SortOptions& opts, + const std::string& fileName, + const unsigned int fileStartOffset, + const Settings& settings = Settings()); void addAlreadySorted(const Key&, const Value&); - Iterator* done(); /// Can't add more data after calling done() + + /** + * Spills any data remaining in the buffer to disk and then closes the file to which data was + * written. + * + * No more data can be added via addAlreadySorted() after calling done(). + */ + Iterator* done(); + + /** + * Only call this after done() has been called to set the end offset. + */ + unsigned int getFileEndOffset() { + invariant(!_file.is_open()); + return _fileEndOffset; + } private: void spill(); const Settings _settings; std::string _fileName; - std::shared_ptr<sorter::FileDeleter> _fileDeleter; // Must outlive _file std::ofstream _file; BufBuilder _buffer; + + // Tracks where in the file we started and finished writing the sorted data range so that the + // information can be given to the Iterator in done(), and to the user via getFileEndOffset() + // for the next SortedFileWriter instance using the same file. + unsigned int _fileStartOffset; + unsigned int _fileEndOffset; }; } @@ -233,6 +287,7 @@ private: template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \ SortIteratorInterface<Key, Value>::merge<Comparator>( \ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \ + const std::string& fileName, \ const SortOptions& opts, \ const Comparator& comp); \ template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \ diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index 449b26daaff..a154912ecce 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -45,12 +45,31 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" +#include <memory> + +namespace mongo { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicUInt32 sorterTestFileCounter; + return "extsort-sorter-test." + std::to_string(sorterTestFileCounter.fetchAndAdd(1)); +} + +} // namespace mongo + // Need access to internal classes #include "mongo/db/sorter/sorter.cpp" -#include <memory> - namespace mongo { + using namespace mongo::sorter; using std::make_shared; using std::pair; @@ -109,6 +128,8 @@ class IntIterator : public IWIterator { public: IntIterator(int start = 0, int stop = INT_MAX, int increment = 1) : _current(start), _increment(increment), _stop(stop) {} + void openSource() {} + void closeSource() {} bool more() { if (_increment == 0) return true; @@ -130,6 +151,8 @@ private: class EmptyIterator : public IWIterator { public: + void openSource() {} + void closeSource() {} bool more() { return false; } @@ -145,6 +168,9 @@ public: verify(limit > 0); } + void openSource() {} + void closeSource() {} + bool more() { return _remaining && _source->more(); } @@ -163,6 +189,8 @@ template <typename It1, typename It2> void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) { int iteration; try { + it1->openSource(); + it2->openSource(); for (iteration = 0; true; iteration++) { ASSERT_EQUALS(it1->more(), it2->more()); ASSERT_EQUALS(it1->more(), it2->more()); // make sure more() is safe to call twice @@ -174,10 +202,13 @@ void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) { ASSERT_EQUALS(pair1.first, pair2.first); ASSERT_EQUALS(pair1.second, pair2.second); } - + it1->closeSource(); + it2->closeSource(); } catch (...) { mongo::unittest::log() << "Failure from line " << line << " on iteration " << iteration << std::endl; + it1->closeSource(); + it2->closeSource(); throw; } } @@ -195,10 +226,11 @@ template <typename IteratorPtr, int N> std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N], Direction Dir = ASC, const SortOptions& opts = SortOptions()) { + invariant(!opts.extSortAllowed); std::vector<std::shared_ptr<IWIterator>> vec; for (int i = 0; i < N; i++) vec.push_back(std::shared_ptr<IWIterator>(array[i])); - return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir))); + return std::shared_ptr<IWIterator>(IWIterator::merge(vec, "", opts, IWComparator(Dir))); } // @@ -225,6 +257,8 @@ public: class UnsortedIter : public IWIterator { public: UnsortedIter() : _pos(0) {} + void openSource() {} + void closeSource() {} bool more() { return _pos < sizeof(unsorted) / sizeof(unsorted[0]); } @@ -248,7 +282,8 @@ public: unittest::TempDir tempDir("sortedFileWriterTests"); const SortOptions opts = SortOptions().TempDir(tempDir.path()); { // small - SortedFileWriter<IntWrapper, IntWrapper> sorter(opts); + std::string fileName = opts.tempDir + "/" + nextFileName(); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0); sorter.addAlreadySorted(0, 0); sorter.addAlreadySorted(1, -1); sorter.addAlreadySorted(2, -2); @@ -256,14 +291,19 @@ public: sorter.addAlreadySorted(4, -4); ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), make_shared<IntIterator>(0, 5)); + + ASSERT_TRUE(boost::filesystem::remove(fileName)); } { // big - SortedFileWriter<IntWrapper, IntWrapper> sorter(opts); + std::string fileName = opts.tempDir + "/" + nextFileName(); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0); for (int i = 0; i < 10 * 1000 * 1000; i++) sorter.addAlreadySorted(i, -i); ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), make_shared<IntIterator>(0, 10 * 1000 * 1000)); + + ASSERT_TRUE(boost::filesystem::remove(fileName)); } ASSERT(boost::filesystem::is_empty(tempDir.path())); @@ -277,7 +317,7 @@ public: { // test empty (no inputs) std::vector<std::shared_ptr<IWIterator>> vec; std::shared_ptr<IWIterator> mergeIter( - IWIterator::merge(vec, SortOptions(), IWComparator())); + IWIterator::merge(vec, "", SortOptions(), IWComparator())); ASSERT_ITERATORS_EQUIVALENT(mergeIter, make_shared<EmptyIterator>()); } { // test empty (only empty inputs) @@ -489,12 +529,6 @@ public: void addData(unowned_ptr<IWSorter> sorter) { for (int i = 0; i < NUM_ITEMS; i++) sorter->add(_array[i], -_array[i]); - - if (typeid(*this) == typeid(LotsOfDataLittleMemory)) { - // don't do this check in subclasses since they may set a limit - ASSERT_GREATER_THAN_OR_EQUALS(static_cast<size_t>(sorter->numFiles()), - (NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT); - } } virtual std::shared_ptr<IWIterator> correct() { diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp index f9fb4d8317e..4e80684b74a 100644 --- a/src/mongo/s/query/cluster_aggregation_planner_test.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp @@ -73,6 +73,7 @@ public: _expCtx = new ExpressionContextForTest(operationContext(), AggregationRequest{kTestAggregateNss, {}}); _expCtx->mongoProcessInterface = std::make_shared<FakeMongoProcessInterface>(); + _expCtx->inMongos = true; } boost::intrusive_ptr<ExpressionContext> expCtx() { |