summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2018-09-13 15:09:08 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2019-06-11 12:51:49 -0400
commitc1c761dd865308a15ff75748bf111d0a3ce366d6 (patch)
treea385cae66549f4f55657d4ce06f999d981a6d0d5
parent64d8e9e1b12d16b54d6a592bae8110226c491b4e (diff)
downloadmongo-c1c761dd865308a15ff75748bf111d0a3ce366d6.tar.gz
SERVER-17010 each Sorter instance spills to a single file rather than a new file per spill to disk
SERVER-38764 External sorter should use 64-bit integers for file offsets (cherry picked from commit 48d999c08304b6ede2a9d1f9d9db974b59fe97e2)
-rw-r--r--src/mongo/db/index/index_access_method.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp61
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp50
-rw-r--r--src/mongo/db/pipeline/document_source_group.h6
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp27
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp5
-rw-r--r--src/mongo/db/sorter/sorter.cpp337
-rw-r--r--src/mongo/db/sorter/sorter.h98
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp58
10 files changed, 482 insertions, 181 deletions
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 95d3a26f48b..ada7e27ab92 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -587,6 +587,20 @@ void IndexAccessMethod::getKeys(const BSONObj& obj,
}
}
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 indexAccessMethodFileCounter;
+ return "extsort-index." + std::to_string(indexAccessMethodFileCounter.fetchAndAdd(1));
+}
+
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index b02e8d0226f..1550efd0da3 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -44,6 +44,44 @@ REGISTER_DOCUMENT_SOURCE(bucketAuto,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceBucketAuto::createFromBson);
+namespace {
+
+boost::intrusive_ptr<Expression> parseGroupByExpression(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const BSONElement& groupByField,
+ const VariablesParseState& vps) {
+ if (groupByField.type() == BSONType::Object &&
+ groupByField.embeddedObject().firstElementFieldName()[0] == '$') {
+ return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps);
+ } else if (groupByField.type() == BSONType::String &&
+ groupByField.valueStringData()[0] == '$') {
+ return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps);
+ } else {
+ uasserted(
+ 40239,
+ str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed "
+ "path or an expression object, but found: "
+ << groupByField.toString(false, false));
+ }
+}
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 documentSourceBucketAutoFileCounter;
+ return "extsort-doc-bucket." +
+ std::to_string(documentSourceBucketAutoFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
const char* DocumentSourceBucketAuto::getSourceName() const {
return "$bucketAuto";
}
@@ -405,28 +443,6 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto(
}
}
-namespace {
-
-boost::intrusive_ptr<Expression> parseGroupByExpression(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const BSONElement& groupByField,
- const VariablesParseState& vps) {
- if (groupByField.type() == BSONType::Object &&
- groupByField.embeddedObject().firstElementFieldName()[0] == '$') {
- return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps);
- } else if (groupByField.type() == BSONType::String &&
- groupByField.valueStringData()[0] == '$') {
- return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps);
- } else {
- uasserted(
- 40239,
- str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed "
- "path or an expression object, but found: "
- << groupByField.toString(false, false));
- }
-}
-} // namespace
-
intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(40240,
@@ -496,6 +512,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
accumulationStatements,
granularityRounder);
}
+
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index a70a65c06b9..2324455c969 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -28,6 +28,8 @@
#include "mongo/platform/basic.h"
+#include <boost/filesystem/operations.hpp>
+
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -39,9 +41,28 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/destructor_guard.h"
namespace mongo {
+namespace {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 documentSourceGroupFileCounter;
+ return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
using boost::intrusive_ptr;
using std::shared_ptr;
using std::pair;
@@ -276,7 +297,18 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
- _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {}
+ _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {
+ if (!pExpCtx->inRouter && (pExpCtx->extSortAllowed || kDebugBuild)) {
+ // We spill to disk in debug mode, regardless of extSortAllowed, to stress the system.
+ _fileName = pExpCtx->tempDir + "/" + nextFileName();
+ }
+}
+
+DocumentSourceGroup::~DocumentSourceGroup() {
+ if (_ownsFileDeletion) {
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
+ }
+}
void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
vFieldName.push_back(accumulationStatement.fieldName);
@@ -575,7 +607,11 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
_groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
_sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
- _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
+ _sortedFiles,
+ _fileName,
+ SortOptions(),
+ SorterComparator(pExpCtx->getValueComparator())));
+ _ownsFileDeletion = false;
// prepare current to accumulate data
_currentAccumulators.reserve(numAccumulators);
@@ -608,7 +644,8 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator()));
- SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir));
+ SortedFileWriter<Value, Value> writer(
+ SortOptions().TempDir(pExpCtx->tempDir), _fileName, _nextSortedFileWriterOffset);
switch (vpAccumulatorFactory.size()) { // same as ptrs[i]->second.size() for all i.
case 0: // no values, essentially a distinct
for (size_t i = 0; i < ptrs.size(); i++) {
@@ -636,7 +673,9 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
_groups->clear();
- return shared_ptr<Sorter<Value, Value>::Iterator>(writer.done());
+ Sorter<Value, Value>::Iterator* iteratorPtr = writer.done();
+ _nextSortedFileWriterOffset = writer.getFileEndOffset();
+ return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr);
}
boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const {
@@ -878,7 +917,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::getMergeSource() {
return pMerger;
}
-}
+
+} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index e79ce631817..270e44eb269 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -100,6 +100,8 @@ private:
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
+ ~DocumentSourceGroup();
+
/**
* getNext() dispatches to one of these three depending on what type of $group it is. All three
* of these methods expect '_currentAccumulators' to have been reset before being called, and
@@ -161,6 +163,10 @@ private:
size_t _memoryUsageBytes = 0;
size_t _maxMemoryUsageBytes;
std::unique_ptr<Variables> _variables;
+ std::string _fileName;
+ unsigned int _nextSortedFileWriterOffset = 0;
+ bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over.
+
std::vector<std::string> _idFieldNames; // used when id is a document
std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 2e023c2ebc9..12153489b0a 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -221,7 +221,7 @@ protected:
expressionContext->tempDir = _tempDir.path();
_group = DocumentSourceGroup::createFromBson(specElement, expressionContext);
- assertRoundTrips(_group);
+ assertRoundTrips(_group, expressionContext);
}
DocumentSourceGroup* group() {
return static_cast<DocumentSourceGroup*>(_group.get());
@@ -240,13 +240,14 @@ protected:
private:
/** Check that the group's spec round trips. */
- void assertRoundTrips(const intrusive_ptr<DocumentSource>& group) {
+ void assertRoundTrips(const intrusive_ptr<DocumentSource>& group,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
// We don't check against the spec that generated 'group' originally, because
// $const operators may be introduced in the first serialization.
BSONObj spec = toBson(group);
BSONElement specElement = spec.firstElement();
intrusive_ptr<DocumentSource> generated =
- DocumentSourceGroup::createFromBson(specElement, ctx());
+ DocumentSourceGroup::createFromBson(specElement, expCtx);
ASSERT_BSONOBJ_EQ(spec, toBson(generated));
}
std::unique_ptr<QueryTestServiceContext> _queryServiceContext;
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 6554a892016..d8298f6b02e 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -46,6 +46,25 @@ using std::make_pair;
using std::string;
using std::vector;
+namespace {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 documentSourceSortFileCounter;
+ return "extsort-doc-source-sort." +
+ std::to_string(documentSourceSortFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), _mergingPresorted(false) {}
@@ -294,6 +313,9 @@ public:
return make_pair(_sorter->extractKey(doc), doc);
}
+ void openSource() {}
+ void closeSource() {}
+
private:
DocumentSourceSort* _sorter;
DBClientCursor* _cursor;
@@ -305,7 +327,7 @@ void DocumentSourceSort::populateFromCursors(const vector<DBClientCursor*>& curs
iterators.push_back(std::make_shared<IteratorFromCursor>(this, cursors[i]));
}
- _output.reset(MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this)));
+ _output.reset(MySorter::Iterator::merge(iterators, "", makeSortOptions(), Comparator(*this)));
_populated = true;
}
@@ -373,7 +395,8 @@ intrusive_ptr<DocumentSource> DocumentSourceSort::getMergeSource() {
other->_sort = _sort;
return other;
}
-}
+
+} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index ed4ea7aefee..65989c859a5 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/dbtests/dbtests.h"
+#include "mongo/unittest/temp_dir.h"
namespace mongo {
bool isMongos() {
@@ -94,6 +95,8 @@ public:
AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(&_opCtx, request);
+ TempDir tempDir("PipelineTest");
+ ctx->tempDir = tempDir.path();
// For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the
// operations will be able to have a resolved view definition.
@@ -918,6 +921,8 @@ public:
AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(&_opCtx, request);
+ TempDir tempDir("PipelineTest");
+ ctx->tempDir = tempDir.path();
// For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the
// operations will be able to have a resolved view definition.
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 0165e0adccc..02e1dc57313 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -67,6 +67,7 @@
#include "mongo/util/unowned_ptr.h"
namespace mongo {
+
namespace sorter {
using std::shared_ptr;
@@ -120,19 +121,9 @@ void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs)
#endif
}
-/** Ensures a named file is deleted when this object goes out of scope */
-class FileDeleter {
-public:
- FileDeleter(const std::string& fileName) : _fileName(fileName) {}
- ~FileDeleter() {
- DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName);)
- }
-
-private:
- const std::string _fileName;
-};
-
-/** Returns results from sorted in-memory storage */
+/**
+ * Returns results from sorted in-memory storage.
+ */
template <typename Key, typename Value>
class InMemIterator : public SortIteratorInterface<Key, Value> {
public:
@@ -148,6 +139,9 @@ public:
template <typename Container>
InMemIterator(const Container& input) : _data(input.begin(), input.end()) {}
+ void openSource() {}
+ void closeSource() {}
+
bool more() {
return !_data.empty();
}
@@ -161,7 +155,15 @@ private:
std::deque<Data> _data;
};
-/** Returns results in order from a single file */
+/**
+ * Returns results from a sorted range within a file. Each instance is given a file name and start
+ * and end offsets.
+ *
+ * This class is NOT responsible for file clean up / deletion. There are openSource() and
+ * closeSource() functions to ensure the FileIterator is not holding the file open when the file is
+ * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in
+ * use elsewhere.
+ */
template <typename Key, typename Value>
class FileIterator : public SortIteratorInterface<Key, Value> {
public:
@@ -171,48 +173,78 @@ public:
typedef std::pair<Key, Value> Data;
FileIterator(const std::string& fileName,
- const Settings& settings,
- std::shared_ptr<FileDeleter> fileDeleter)
+ std::streampos fileStartOffset,
+ std::streampos fileEndOffset,
+ const Settings& settings)
: _settings(settings),
_done(false),
_fileName(fileName),
- _fileDeleter(fileDeleter),
- _file(_fileName.c_str(), std::ios::in | std::ios::binary) {
- massert(16814,
+ _fileStartOffset(fileStartOffset),
+ _fileEndOffset(fileEndOffset) {
+ uassert(16815,
+ str::stream() << "unexpected empty file: " << _fileName,
+ boost::filesystem::file_size(_fileName) != 0);
+ }
+
+ void openSource() {
+ _file.open(_fileName.c_str(), std::ios::in | std::ios::binary);
+ uassert(16814,
str::stream() << "error opening file \"" << _fileName << "\": "
<< myErrnoWithDescription(),
_file.good());
+ _file.seekg(_fileStartOffset);
+ uassert(50979,
+ str::stream() << "error seeking starting offset of '" << _fileStartOffset
+ << "' in file \""
+ << _fileName
+ << "\": "
+ << myErrnoWithDescription(),
+ _file.good());
+ }
- massert(16815,
- str::stream() << "unexpected empty file: " << _fileName,
- boost::filesystem::file_size(_fileName) != 0);
+ void closeSource() {
+ _file.close();
+ uassert(50969,
+ str::stream() << "error closing file \"" << _fileName << "\": "
+ << myErrnoWithDescription(),
+ !_file.fail());
}
bool more() {
if (!_done)
- fillIfNeeded(); // may change _done
+ fillBufferIfNeeded(); // may change _done
return !_done;
}
Data next() {
verify(!_done);
- fillIfNeeded();
-
- // Note: key must be read before value so can't pass directly to Data constructor
- auto first = Key::deserializeForSorter(*_reader, _settings.first);
- auto second = Value::deserializeForSorter(*_reader, _settings.second);
+ fillBufferIfNeeded();
+
+ // Note: calling read() on the _bufferReader buffer in the deserialize function advances the
+ // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
+ // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
+ // the Data constructor
+ auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
+ auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
return Data(std::move(first), std::move(second));
}
private:
- void fillIfNeeded() {
+ /**
+ * Attempts to refill the _bufferReader if it is empty. Expects _done to be false.
+ */
+ void fillBufferIfNeeded() {
verify(!_done);
- if (!_reader || _reader->atEof())
- fill();
+ if (!_bufferReader || _bufferReader->atEof())
+ fillBufferFromDisk();
}
- void fill() {
+ /**
+ * Tries to read from disk and places any results in _bufferReader. If there is no more data to
+ * read, then _done is set to true and the function returns immediately.
+ */
+ void fillBufferFromDisk() {
int32_t rawSize;
read(&rawSize, sizeof(rawSize));
if (_done)
@@ -224,7 +256,7 @@ private:
_buffer.reset(new char[blockSize]);
read(_buffer.get(), blockSize);
- massert(16816, "file too short?", !_done);
+ uassert(16816, "file too short?", !_done);
auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
if (encryptionHooks->enabled()) {
@@ -236,7 +268,7 @@ private:
reinterpret_cast<uint8_t*>(out.get()),
blockSize,
&outLen);
- massert(28841,
+ uassert(28841,
str::stream() << "Failed to unprotect data: " << status.toString(),
status.isOK());
blockSize = outLen;
@@ -244,70 +276,91 @@ private:
}
if (!compressed) {
- _reader.reset(new BufReader(_buffer.get(), blockSize));
+ _bufferReader.reset(new BufReader(_buffer.get(), blockSize));
return;
}
dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
size_t uncompressedSize;
- massert(17061,
+ uassert(17061,
"couldn't get uncompressed length",
snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
- massert(17062,
+ uassert(17062,
"decompression failed",
snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get()));
// hold on to decompressed data and throw out compressed data at block exit
_buffer.swap(decompressionBuffer);
- _reader.reset(new BufReader(_buffer.get(), uncompressedSize));
+ _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize));
}
- // sets _done to true on EOF - asserts on any other error
+ /**
+ * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset.
+ *
+ * Masserts on any file errors
+ */
void read(void* out, size_t size) {
- _file.read(reinterpret_cast<char*>(out), size);
- if (!_file.good()) {
- if (_file.eof()) {
- _done = true;
- return;
- }
+ invariant(_file.is_open());
- msgasserted(16817,
- str::stream() << "error reading file \"" << _fileName << "\": "
- << myErrnoWithDescription());
+ const std::streampos offset = _file.tellg();
+ uassert(51049,
+ str::stream() << "error reading file \"" << _fileName << "\": "
+ << myErrnoWithDescription(),
+ offset >= 0);
+
+ if (offset >= _fileEndOffset) {
+ invariant(offset == _fileEndOffset);
+ _done = true;
+ return;
}
+
+ _file.read(reinterpret_cast<char*>(out), size);
+ uassert(16817,
+ str::stream() << "error reading file \"" << _fileName << "\": "
+ << myErrnoWithDescription(),
+ _file.good());
verify(_file.gcount() == static_cast<std::streamsize>(size));
}
const Settings _settings;
bool _done;
std::unique_ptr<char[]> _buffer;
- std::unique_ptr<BufReader> _reader;
- std::string _fileName;
- std::shared_ptr<FileDeleter> _fileDeleter; // Must outlive _file
+ std::unique_ptr<BufReader> _bufferReader;
+ std::string _fileName; // File containing the sorted data range.
+ std::streampos _fileStartOffset; // File offset at which the sorted data range starts.
+ std::streampos _fileEndOffset; // File offset at which the sorted data range ends.
std::ifstream _file;
};
-/** Merge-sorts results from 0 or more FileIterators */
+/**
+ * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted
+ * ranges within the same file. This class is given the data source file name upon construction and
+ * is responsible for deleting the data source file upon destruction.
+ */
template <typename Key, typename Value, typename Comparator>
class MergeIterator : public SortIteratorInterface<Key, Value> {
public:
typedef SortIteratorInterface<Key, Value> Input;
typedef std::pair<Key, Value> Data;
-
MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
+ const std::string& itersSourceFileName,
const SortOptions& opts,
const Comparator& comp)
: _opts(opts),
_remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
_first(true),
- _greater(comp) {
+ _greater(comp),
+ _itersSourceFileName(itersSourceFileName) {
for (size_t i = 0; i < iters.size(); i++) {
+ iters[i]->openSource();
if (iters[i]->more()) {
_heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
+ } else {
+ iters[i]->closeSource();
}
}
@@ -322,16 +375,22 @@ public:
_heap.pop_back();
}
+ ~MergeIterator() {
+ // Clear the remaining Stream objects first, to close the file handles before deleting the
+ // file. Some systems will error closing the file if any file handles are still open.
+ _current.reset();
+ _heap.clear();
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName));
+ }
+
+ void openSource() {}
+ void closeSource() {}
+
bool more() {
if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
return true;
- // We are done so clean up resources.
- // Can't do this in next() due to lifetime guarantees of unowned Data.
- _heap.clear();
- _current.reset();
_remaining = 0;
-
return false;
}
@@ -347,7 +406,6 @@ public:
if (!_current->advance()) {
verify(!_heap.empty());
-
std::pop_heap(_heap.begin(), _heap.end(), _greater);
_current = _heap.back();
_heap.pop_back();
@@ -362,11 +420,22 @@ public:
private:
- class Stream { // Data + Iterator
+ /**
+ * Data iterator over an Input stream.
+ *
+ * This class is responsible for closing the Input source upon destruction, unfortunately,
+ * because that is the path of least resistence to a design change requiring MergeIterator to
+ * handle eventual deletion of said Input source.
+ */
+ class Stream {
public:
Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
: fileNum(fileNum), _current(first), _rest(rest) {}
+ ~Stream() {
+ _rest->closeSource();
+ }
+
const Data& current() const {
return _current;
}
@@ -412,6 +481,7 @@ private:
std::shared_ptr<Stream> _current;
std::vector<std::shared_ptr<Stream>> _heap; // MinHeap
STLComparator _greater; // named so calls make sense
+ std::string _itersSourceFileName;
};
template <typename Key, typename Value, typename Comparator>
@@ -428,9 +498,22 @@ public:
const Settings& settings = Settings())
: _comp(comp), _settings(settings), _opts(opts), _memUsed(0) {
verify(_opts.limit == 0);
+ if (_opts.extSortAllowed) {
+ _fileName = _opts.tempDir + "/" + nextFileName();
+ }
+ }
+
+ ~NoLimitSorter() {
+ if (!_done) {
+ // If done() was never called to return a MergeIterator, then this Sorter still owns
+ // file deletion.
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
+ }
}
void add(const Key& key, const Value& val) {
+ invariant(!_done);
+
_data.push_back(std::make_pair(key, val));
_memUsed += key.memUsageForSorter();
@@ -441,21 +524,17 @@ public:
}
Iterator* done() {
+ invariant(!_done);
+
if (_iters.empty()) {
sort();
return new InMemIterator<Key, Value>(_data);
}
spill();
- return Iterator::merge(_iters, _opts, _comp);
- }
-
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const {
- return _iters.size();
- }
- size_t memUsed() const {
- return _memUsed;
+ Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp);
+ _done = true;
+ return mergeIt;
}
private:
@@ -481,6 +560,8 @@ private:
}
void spill() {
+ invariant(!_done);
+
if (_data.empty())
return;
@@ -499,12 +580,15 @@ private:
sort();
- SortedFileWriter<Key, Value> writer(_opts, _settings);
+ SortedFileWriter<Key, Value> writer(
+ _opts, _fileName, _nextSortedFileWriterOffset, _settings);
for (; !_data.empty(); _data.pop_front()) {
writer.addAlreadySorted(_data.front().first, _data.front().second);
}
+ Iterator* iteratorPtr = writer.done();
+ _nextSortedFileWriterOffset = writer.getFileEndOffset();
- _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+ _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
}
@@ -512,6 +596,9 @@ private:
const Comparator _comp;
const Settings _settings;
SortOptions _opts;
+ std::string _fileName;
+ std::streampos _nextSortedFileWriterOffset = 0;
+ bool _done = false;
size_t _memUsed;
std::deque<Data> _data; // the "current" data
std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
@@ -552,14 +639,6 @@ public:
}
}
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const {
- return 0;
- }
- size_t memUsed() const {
- return _best.first.memUsageForSorter() + _best.second.memUsageForSorter();
- }
-
private:
const Comparator _comp;
Data _best;
@@ -588,6 +667,10 @@ public:
// This also *works* with limit==1 but LimitOneSorter should be used instead
verify(_opts.limit > 1);
+ if (_opts.extSortAllowed) {
+ _fileName = _opts.tempDir + "/" + nextFileName();
+ }
+
// Preallocate a fixed sized vector of the required size if we
// don't expect it to have a major impact on our memory budget.
// This is the common case with small limits.
@@ -596,7 +679,17 @@ public:
}
}
+ ~TopKSorter() {
+ if (!_done) {
+ // If done() was never called to return a MergeIterator, then this Sorter still owns
+ // file deletion.
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
+ }
+ }
+
void add(const Key& key, const Value& val) {
+ invariant(!_done);
+
STLComparator less(_comp);
Data contender(key, val);
@@ -646,15 +739,9 @@ public:
}
spill();
- return Iterator::merge(_iters, _opts, _comp);
- }
-
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const {
- return _iters.size();
- }
- size_t memUsed() const {
- return _memUsed;
+ Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp);
+ _done = true;
+ return iterator;
}
private:
@@ -759,6 +846,8 @@ private:
}
void spill() {
+ invariant(!_done);
+
if (_data.empty())
return;
@@ -781,7 +870,8 @@ private:
sort();
updateCutoff();
- SortedFileWriter<Key, Value> writer(_opts, _settings);
+ SortedFileWriter<Key, Value> writer(
+ _opts, _fileName, _nextSortedFileWriterOffset, _settings);
for (size_t i = 0; i < _data.size(); i++) {
writer.addAlreadySorted(_data[i].first, _data[i].second);
}
@@ -789,7 +879,9 @@ private:
// clear _data and release backing array's memory
std::vector<Data>().swap(_data);
- _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+ Iterator* iteratorPtr = writer.done();
+ _nextSortedFileWriterOffset = writer.getFileEndOffset();
+ _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
}
@@ -797,6 +889,9 @@ private:
const Comparator _comp;
const Settings _settings;
SortOptions _opts;
+ std::string _fileName;
+ std::streampos _nextSortedFileWriterOffset = 0;
+ bool _done = false;
size_t _memUsed;
std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit.
std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
@@ -810,46 +905,44 @@ private:
size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far.
};
-inline unsigned nextFileNumber() {
- // This is unified across all Sorter types and instances.
- static AtomicUInt32 fileCounter;
- return fileCounter.fetchAndAdd(1);
-}
} // namespace sorter
//
// SortedFileWriter
//
-
template <typename Key, typename Value>
-SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, const Settings& settings)
+SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
+ const std::string& fileName,
+ const std::streampos fileStartOffset,
+ const Settings& settings)
: _settings(settings) {
namespace str = mongoutils::str;
// This should be checked by consumers, but if we get here don't allow writes.
- massert(
+ uassert(
16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
- massert(17148,
+ uassert(17148,
"Attempting to use external sort without setting SortOptions::tempDir",
!opts.tempDir.empty());
- {
- StringBuilder sb;
- sb << opts.tempDir << "/extsort." << sorter::nextFileNumber();
- _fileName = sb.str();
- }
-
boost::filesystem::create_directories(opts.tempDir);
- _file.open(_fileName.c_str(), std::ios::binary | std::ios::out);
- massert(16818,
+ _fileName = fileName;
+
+ // We open the provided file in append mode so that SortedFileWriter instances can share the
+ // same file, used serially. We want to share files in order to stay below system open file
+ // limits.
+ _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out);
+ uassert(16818,
str::stream() << "error opening file \"" << _fileName << "\": "
<< sorter::myErrnoWithDescription(),
_file.good());
-
- _fileDeleter = std::make_shared<sorter::FileDeleter>(_fileName);
+ // The file descriptor is positioned at the end of a file when opened in append mode, but
+ // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass
+ // in the expected offset to this constructor.
+ _fileStartOffset = fileStartOffset;
// throw on failure
_file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
@@ -895,7 +988,7 @@ void SortedFileWriter<Key, Value>::spill() {
reinterpret_cast<uint8_t*>(out.get()),
protectedSizeMax,
&resultLen);
- massert(28842,
+ uassert(28842,
str::stream() << "Failed to compress data: " << status.toString(),
status.isOK());
outBuffer = out.get();
@@ -907,7 +1000,6 @@ void SortedFileWriter<Key, Value>::spill() {
try {
_file.write(reinterpret_cast<const char*>(&size), sizeof(size));
_file.write(outBuffer, std::abs(size));
-
} catch (const std::exception&) {
msgasserted(16821,
str::stream() << "error writing to file \"" << _fileName << "\": "
@@ -920,8 +1012,20 @@ void SortedFileWriter<Key, Value>::spill() {
template <typename Key, typename Value>
SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
spill();
+ std::streampos currentFileOffset = _file.tellp();
+ uassert(50980,
+ str::stream() << "error fetching current file descriptor offset in file \"" << _fileName
+ << "\": "
+ << sorter::myErrnoWithDescription(),
+ currentFileOffset >= 0);
+
+ // In case nothing was written to disk, use _fileStartOffset because tellp() may not be
+ // initialized on all systems upon opening the file.
+ _fileEndOffset = currentFileOffset < _fileStartOffset ? _fileStartOffset : currentFileOffset;
_file.close();
- return new sorter::FileIterator<Key, Value>(_fileName, _settings, _fileDeleter);
+
+ return new sorter::FileIterator<Key, Value>(
+ _fileName, _fileStartOffset, _fileEndOffset, _settings);
}
//
@@ -932,9 +1036,10 @@ template <typename Key, typename Value>
template <typename Comparator>
SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const std::string& fileName,
const SortOptions& opts,
const Comparator& comp) {
- return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp);
+ return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp);
}
template <typename Key, typename Value>
@@ -943,11 +1048,11 @@ Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
const Comparator& comp,
const Settings& settings) {
// This should be checked by consumers, but if it isn't try to fail early.
- massert(16947,
+ uassert(16947,
"Attempting to use external sort from mongos. This is not allowed.",
!(isMongos() && opts.extSortAllowed));
- massert(17149,
+ uassert(17149,
"Attempting to use external sort without setting SortOptions::tempDir",
!(opts.extSortAllowed && opts.tempDir.empty()));
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 54f19dd0197..98affdf3152 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -84,24 +84,28 @@
*/
namespace mongo {
-namespace sorter {
-// Everything in this namespace is internal to the sorter
-class FileDeleter;
-}
/**
* Runtime options that control the Sorter's behavior
*/
struct SortOptions {
- unsigned long long limit; /// number of KV pairs to be returned. 0 for no limit.
- size_t maxMemoryUsageBytes; /// Approximate.
- bool extSortAllowed; /// If false, uassert if more mem needed than allowed.
- std::string tempDir; /// Directory to directly place files in.
- /// Must be explicitly set if extSortAllowed is true.
+ // The number of KV pairs to be returned. 0 indicates no limit.
+ unsigned long long limit;
+
+ // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate.
+ size_t maxMemoryUsageBytes;
+
+ // Whether we are allowed to spill to disk. If this is false and in-memory exceeds
+ // maxMemoryUsageBytes, we will uassert.
+ bool extSortAllowed;
+
+ // Directory into which we place a file when spilling to disk. Must be explicitly set if
+ // extSortAllowed is true.
+ std::string tempDir;
SortOptions() : limit(0), maxMemoryUsageBytes(64 * 1024 * 1024), extSortAllowed(false) {}
- /// Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true)
+ // Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true)
SortOptions& Limit(unsigned long long newLimit) {
limit = newLimit;
@@ -124,7 +128,9 @@ struct SortOptions {
}
};
-/// This is the output from the sorting framework
+/**
+ * This is the sorted output iterator from the sorting framework.
+ */
template <typename Key, typename Value>
class SortIteratorInterface {
MONGO_DISALLOW_COPYING(SortIteratorInterface);
@@ -139,18 +145,37 @@ public:
virtual ~SortIteratorInterface() {}
- /// Returns an iterator that merges the passed in iterators
+ // Returns an iterator that merges the passed in iterators
template <typename Comparator>
static SortIteratorInterface* merge(
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const std::string& fileName,
const SortOptions& opts,
const Comparator& comp);
+ // Opens and closes the source of data over which this class iterates, if applicable.
+ virtual void openSource() = 0;
+ virtual void closeSource() = 0;
+
protected:
SortIteratorInterface() {} // can only be constructed as a base
};
-/// This is the main way to input data to the sorting framework
+/**
+ * This is the way to input data to the sorting framework.
+ *
+ * Each instance of this class will generate a file name and spill sorted data ranges to that file
+ * if allowed in its given Settings. If the instance destructs before done() is called, it will
+ * handle deleting the data file used for spills. Otherwise, if done() is called, responsibility for
+ * file deletion moves to the returned Iterator object, which must then delete the file upon its own
+ * destruction.
+ *
+ * All users of Sorter implementations must define their own nextFileName() function to generate
+ * unique file names for spills to disk. This is necessary because the sorter.cpp file is separately
+ * directly included in multiple places, rather than compiled in one place and linked, and so cannot
+ * itself provide a globally unique ID for file names. See existing function implementations of
+ * nextFileName() for example.
+ */
template <typename Key, typename Value>
class Sorter {
MONGO_DISALLOW_COPYING(Sorter);
@@ -168,19 +193,24 @@ public:
const Settings& settings = Settings());
virtual void add(const Key&, const Value&) = 0;
- virtual Iterator* done() = 0; /// Can't add more data after calling done()
- virtual ~Sorter() {}
+ /**
+ * Cannot add more data after calling done().
+ *
+ * The returned Iterator must not outlive the Sorter instance, which manages file clean up.
+ */
+ virtual Iterator* done() = 0;
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- virtual int numFiles() const = 0;
- virtual size_t memUsed() const = 0;
+ virtual ~Sorter() {}
protected:
Sorter() {} // can only be constructed as a base
};
-/// Writes pre-sorted data to a sorted file and hands-back an Iterator over that file.
+/**
+ * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file
+ * range.
+ */
template <typename Key, typename Value>
class SortedFileWriter {
MONGO_DISALLOW_COPYING(SortedFileWriter);
@@ -191,19 +221,42 @@ public:
typename Value::SorterDeserializeSettings>
Settings;
- explicit SortedFileWriter(const SortOptions& opts, const Settings& settings = Settings());
+ explicit SortedFileWriter(const SortOptions& opts,
+ const std::string& fileName,
+ const std::streampos fileStartOffset,
+ const Settings& settings = Settings());
void addAlreadySorted(const Key&, const Value&);
- Iterator* done(); /// Can't add more data after calling done()
+
+ /**
+ * Spills any data remaining in the buffer to disk and then closes the file to which data was
+ * written.
+ *
+ * No more data can be added via addAlreadySorted() after calling done().
+ */
+ Iterator* done();
+
+ /**
+ * Only call this after done() has been called to set the end offset.
+ */
+ std::streampos getFileEndOffset() {
+ invariant(!_file.is_open());
+ return _fileEndOffset;
+ }
private:
void spill();
const Settings _settings;
std::string _fileName;
- std::shared_ptr<sorter::FileDeleter> _fileDeleter; // Must outlive _file
std::ofstream _file;
BufBuilder _buffer;
+
+ // Tracks where in the file we started and finished writing the sorted data range so that the
+ // information can be given to the Iterator in done(), and to the user via getFileEndOffset()
+ // for the next SortedFileWriter instance using the same file.
+ std::streampos _fileStartOffset;
+ std::streampos _fileEndOffset;
};
}
@@ -227,6 +280,7 @@ private:
template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \
SortIteratorInterface<Key, Value>::merge<Comparator>( \
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \
+ const std::string& fileName, \
const SortOptions& opts, \
const Comparator& comp); \
template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index 19171fb13ba..b7a3b6932c6 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -44,10 +44,31 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
+#include <memory>
+
+namespace mongo {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 sorterTestFileCounter;
+ return "extsort-sorter-test." + std::to_string(sorterTestFileCounter.fetchAndAdd(1));
+}
+
+} // namespace mongo
+
// Need access to internal classes
#include "mongo/db/sorter/sorter.cpp"
namespace mongo {
+
using namespace mongo::sorter;
using std::make_shared;
using std::pair;
@@ -118,6 +139,8 @@ class IntIterator : public IWIterator {
public:
IntIterator(int start = 0, int stop = INT_MAX, int increment = 1)
: _current(start), _increment(increment), _stop(stop) {}
+ void openSource() {}
+ void closeSource() {}
bool more() {
if (_increment == 0)
return true;
@@ -139,6 +162,8 @@ private:
class EmptyIterator : public IWIterator {
public:
+ void openSource() {}
+ void closeSource() {}
bool more() {
return false;
}
@@ -154,6 +179,9 @@ public:
verify(limit > 0);
}
+ void openSource() {}
+ void closeSource() {}
+
bool more() {
return _remaining && _source->more();
}
@@ -172,6 +200,8 @@ template <typename It1, typename It2>
void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) {
int iteration;
try {
+ it1->openSource();
+ it2->openSource();
for (iteration = 0; true; iteration++) {
ASSERT_EQUALS(it1->more(), it2->more());
ASSERT_EQUALS(it1->more(), it2->more()); // make sure more() is safe to call twice
@@ -183,10 +213,13 @@ void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) {
ASSERT_EQUALS(pair1.first, pair2.first);
ASSERT_EQUALS(pair1.second, pair2.second);
}
-
+ it1->closeSource();
+ it2->closeSource();
} catch (...) {
mongo::unittest::log() << "Failure from line " << line << " on iteration " << iteration
<< std::endl;
+ it1->closeSource();
+ it2->closeSource();
throw;
}
}
@@ -204,10 +237,11 @@ template <typename IteratorPtr, int N>
std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N],
Direction Dir = ASC,
const SortOptions& opts = SortOptions()) {
+ invariant(!opts.extSortAllowed);
std::vector<std::shared_ptr<IWIterator>> vec;
for (int i = 0; i < N; i++)
vec.push_back(std::shared_ptr<IWIterator>(array[i]));
- return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir)));
+ return std::shared_ptr<IWIterator>(IWIterator::merge(vec, "", opts, IWComparator(Dir)));
}
//
@@ -234,6 +268,8 @@ public:
class UnsortedIter : public IWIterator {
public:
UnsortedIter() : _pos(0) {}
+ void openSource() {}
+ void closeSource() {}
bool more() {
return _pos < sizeof(unsorted) / sizeof(unsorted[0]);
}
@@ -257,7 +293,8 @@ public:
unittest::TempDir tempDir("sortedFileWriterTests");
const SortOptions opts = SortOptions().TempDir(tempDir.path());
{ // small
- SortedFileWriter<IntWrapper, IntWrapper> sorter(opts);
+ std::string fileName = opts.tempDir + "/" + nextFileName();
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0);
sorter.addAlreadySorted(0, 0);
sorter.addAlreadySorted(1, -1);
sorter.addAlreadySorted(2, -2);
@@ -265,14 +302,19 @@ public:
sorter.addAlreadySorted(4, -4);
ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
make_shared<IntIterator>(0, 5));
+
+ ASSERT_TRUE(boost::filesystem::remove(fileName));
}
{ // big
- SortedFileWriter<IntWrapper, IntWrapper> sorter(opts);
+ std::string fileName = opts.tempDir + "/" + nextFileName();
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0);
for (int i = 0; i < 10 * 1000 * 1000; i++)
sorter.addAlreadySorted(i, -i);
ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
make_shared<IntIterator>(0, 10 * 1000 * 1000));
+
+ ASSERT_TRUE(boost::filesystem::remove(fileName));
}
ASSERT(boost::filesystem::is_empty(tempDir.path()));
@@ -286,7 +328,7 @@ public:
{ // test empty (no inputs)
std::vector<std::shared_ptr<IWIterator>> vec;
std::shared_ptr<IWIterator> mergeIter(
- IWIterator::merge(vec, SortOptions(), IWComparator()));
+ IWIterator::merge(vec, "", SortOptions(), IWComparator()));
ASSERT_ITERATORS_EQUIVALENT(mergeIter, make_shared<EmptyIterator>());
}
{ // test empty (only empty inputs)
@@ -498,12 +540,6 @@ public:
void addData(unowned_ptr<IWSorter> sorter) {
for (int i = 0; i < NUM_ITEMS; i++)
sorter->add(_array[i], -_array[i]);
-
- if (typeid(*this) == typeid(LotsOfDataLittleMemory)) {
- // don't do this check in subclasses since they may set a limit
- ASSERT_GREATER_THAN_OR_EQUALS(static_cast<size_t>(sorter->numFiles()),
- (NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT);
- }
}
virtual std::shared_ptr<IWIterator> correct() {