summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2018-09-13 15:09:08 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2018-10-31 08:42:19 -0400
commit2be7f2677a40a863f336d2964f456c9d87ddc838 (patch)
tree968b48c9cb77e7669cf129d3592e7461bd2ed7df /src
parent236c6c28a18210586673097ee436c5b613b6c46f (diff)
downloadmongo-2be7f2677a40a863f336d2964f456c9d87ddc838.tar.gz
SERVER-17010 each Sorter instance spills to a single file rather than a new file per spill to disk
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/index/index_access_method.cpp14
-rw-r--r--src/mongo/db/pipeline/aggregation_context_fixture.h3
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp61
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_group.h6
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp18
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp5
-rw-r--r--src/mongo/db/sorter/sorter.cpp333
-rw-r--r--src/mongo/db/sorter/sorter.h97
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp60
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner_test.cpp1
12 files changed, 474 insertions, 179 deletions
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 4641d6c865d..1d13f159870 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -752,6 +752,20 @@ SortedDataInterface* AbstractIndexAccessMethod::getSortedDataInterface_forTest()
return _newInterface.get();
}
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 indexAccessMethodFileCounter;
+ return "extsort-index." + std::to_string(indexAccessMethodFileCounter.fetchAndAdd(1));
+}
+
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h
index 009c10b42b0..aeacb8ec71e 100644
--- a/src/mongo/db/pipeline/aggregation_context_fixture.h
+++ b/src/mongo/db/pipeline/aggregation_context_fixture.h
@@ -36,6 +36,7 @@
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -54,6 +55,8 @@ public:
// context.
_expCtx = new ExpressionContext(_opCtx.get(), nullptr);
_expCtx->ns = std::move(nss);
+ unittest::TempDir tempDir("AggregationContextFixture");
+ _expCtx->tempDir = tempDir.path();
}
boost::intrusive_ptr<ExpressionContext> getExpCtx() {
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 8ffe222e559..7251d3cc2ef 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -46,6 +46,44 @@ REGISTER_DOCUMENT_SOURCE(bucketAuto,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceBucketAuto::createFromBson);
+namespace {
+
+boost::intrusive_ptr<Expression> parseGroupByExpression(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const BSONElement& groupByField,
+ const VariablesParseState& vps) {
+ if (groupByField.type() == BSONType::Object &&
+ groupByField.embeddedObject().firstElementFieldName()[0] == '$') {
+ return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps);
+ } else if (groupByField.type() == BSONType::String &&
+ groupByField.valueStringData()[0] == '$') {
+ return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps);
+ } else {
+ uasserted(
+ 40239,
+ str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed "
+ "path or an expression object, but found: "
+ << groupByField.toString(false, false));
+ }
+}
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 documentSourceBucketAutoFileCounter;
+ return "extsort-doc-bucket." +
+ std::to_string(documentSourceBucketAutoFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
const char* DocumentSourceBucketAuto::getSourceName() const {
return "$bucketAuto";
}
@@ -400,28 +438,6 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto(
}
}
-namespace {
-
-boost::intrusive_ptr<Expression> parseGroupByExpression(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const BSONElement& groupByField,
- const VariablesParseState& vps) {
- if (groupByField.type() == BSONType::Object &&
- groupByField.embeddedObject().firstElementFieldName()[0] == '$') {
- return Expression::parseObject(expCtx, groupByField.embeddedObject(), vps);
- } else if (groupByField.type() == BSONType::String &&
- groupByField.valueStringData()[0] == '$') {
- return ExpressionFieldPath::parse(expCtx, groupByField.str(), vps);
- } else {
- uasserted(
- 40239,
- str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed "
- "path or an expression object, but found: "
- << groupByField.toString(false, false));
- }
-}
-} // namespace
-
intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(40240,
@@ -486,6 +502,7 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
return DocumentSourceBucketAuto::create(
pExpCtx, groupByExpression, numBuckets.get(), accumulationStatements, granularityRounder);
}
+
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 1866eb06058..440c6c10ee9 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -30,6 +30,8 @@
#include "mongo/platform/basic.h"
+#include <boost/filesystem/operations.hpp>
+
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -41,9 +43,28 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/destructor_guard.h"
namespace mongo {
+namespace {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 documentSourceGroupFileCounter;
+ return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
using boost::intrusive_ptr;
using std::pair;
using std::shared_ptr;
@@ -345,7 +366,18 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
- _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {}
+ _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
+ if (!pExpCtx->inMongos && (pExpCtx->allowDiskUse || kDebugBuild)) {
+ // We spill to disk in debug mode, regardless of allowDiskUse, to stress the system.
+ _fileName = pExpCtx->tempDir + "/" + nextFileName();
+ }
+}
+
+DocumentSourceGroup::~DocumentSourceGroup() {
+ if (_ownsFileDeletion) {
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
+ }
+}
void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
_accumulatedFields.push_back(accumulationStatement);
@@ -637,7 +669,11 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
_groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
_sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
- _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
+ _sortedFiles,
+ _fileName,
+ SortOptions(),
+ SorterComparator(pExpCtx->getValueComparator())));
+ _ownsFileDeletion = false;
// prepare current to accumulate data
_currentAccumulators.reserve(numAccumulators);
@@ -675,7 +711,8 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator()));
- SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir));
+ SortedFileWriter<Value, Value> writer(
+ SortOptions().TempDir(pExpCtx->tempDir), _fileName, _nextSortedFileWriterOffset);
switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i.
case 0: // no values, essentially a distinct
for (size_t i = 0; i < ptrs.size(); i++) {
@@ -703,7 +740,9 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
_groups->clear();
- return shared_ptr<Sorter<Value, Value>::Iterator>(writer.done());
+ Sorter<Value, Value>::Iterator* iteratorPtr = writer.done();
+ _nextSortedFileWriterOffset = writer.getFileEndOffset();
+ return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr);
}
boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const {
@@ -1015,6 +1054,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields));
}
+
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 9df59c10329..a1d6fdad059 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -187,6 +187,8 @@ private:
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
boost::optional<size_t> maxMemoryUsageBytes = boost::none);
+ ~DocumentSourceGroup();
+
/**
* getNext() dispatches to one of these three depending on what type of $group it is. All three
* of these methods expect '_currentAccumulators' to have been reset before being called, and
@@ -245,6 +247,10 @@ private:
bool _doingMerge;
size_t _memoryUsageBytes = 0;
size_t _maxMemoryUsageBytes;
+ std::string _fileName;
+ unsigned int _nextSortedFileWriterOffset = 0;
+ bool _ownsFileDeletion = true; // unless a MergeIterator is made that takes over.
+
std::vector<std::string> _idFieldNames; // used when id is a document
std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 091fc09880c..bde6dfdeb32 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -258,7 +258,7 @@ protected:
expressionContext->tempDir = _tempDir.path();
_group = DocumentSourceGroup::createFromBson(specElement, expressionContext);
- assertRoundTrips(_group);
+ assertRoundTrips(_group, expressionContext);
}
DocumentSourceGroup* group() {
return static_cast<DocumentSourceGroup*>(_group.get());
@@ -277,13 +277,14 @@ protected:
private:
/** Check that the group's spec round trips. */
- void assertRoundTrips(const intrusive_ptr<DocumentSource>& group) {
+ void assertRoundTrips(const intrusive_ptr<DocumentSource>& group,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
// We don't check against the spec that generated 'group' originally, because
// $const operators may be introduced in the first serialization.
BSONObj spec = toBson(group);
BSONElement specElement = spec.firstElement();
intrusive_ptr<DocumentSource> generated =
- DocumentSourceGroup::createFromBson(specElement, ctx());
+ DocumentSourceGroup::createFromBson(specElement, expCtx);
ASSERT_BSONOBJ_EQ(spec, toBson(generated));
}
std::unique_ptr<QueryTestServiceContext> _queryServiceContext;
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index ffe17f9280c..c825a7b9165 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -52,6 +52,7 @@ using std::unique_ptr;
using std::vector;
namespace {
+
Value missingToNull(Value maybeMissing) {
return maybeMissing.missing() ? Value(BSONNULL) : maybeMissing;
}
@@ -96,7 +97,23 @@ Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) {
return Value{std::move(keys)};
}
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 documentSourceSortFileCounter;
+ return "extsort-doc-source-sort." +
+ std::to_string(documentSourceSortFileCounter.fetchAndAdd(1));
+}
+
} // namespace
+
constexpr StringData DocumentSourceSort::kStageName;
DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx)
@@ -526,6 +543,7 @@ bool DocumentSourceSort::canRunInParallelBeforeOut(
// would generally require merging the streams before producing output.
return false;
}
+
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 917fd6cd862..733008f4c1b 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -59,6 +59,7 @@
#include "mongo/dbtests/dbtests.h"
#include "mongo/s/query/cluster_aggregation_planner.h"
#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/temp_dir.h"
namespace mongo {
namespace {
@@ -101,6 +102,8 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
AggregationRequest request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(opCtx.get(), request);
+ TempDir tempDir("PipelineTest");
+ ctx->tempDir = tempDir.path();
// For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the
// operations will be able to have a resolved view definition.
@@ -1752,6 +1755,8 @@ public:
AggregationRequest request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(&_opCtx, request);
+ TempDir tempDir("PipelineTest");
+ ctx->tempDir = tempDir.path();
// For $graphLookup and $lookup, we have to populate the resolvedNamespaces so that the
// operations will be able to have a resolved view definition.
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 0f44776d2b7..497a4fa8ed1 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -68,6 +68,7 @@
#include "mongo/util/unowned_ptr.h"
namespace mongo {
+
namespace sorter {
using std::shared_ptr;
@@ -104,19 +105,9 @@ void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs)
#endif
}
-/** Ensures a named file is deleted when this object goes out of scope */
-class FileDeleter {
-public:
- FileDeleter(const std::string& fileName) : _fileName(fileName) {}
- ~FileDeleter() {
- DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName);)
- }
-
-private:
- const std::string _fileName;
-};
-
-/** Returns results from sorted in-memory storage */
+/**
+ * Returns results from sorted in-memory storage.
+ */
template <typename Key, typename Value>
class InMemIterator : public SortIteratorInterface<Key, Value> {
public:
@@ -132,6 +123,9 @@ public:
template <typename Container>
InMemIterator(const Container& input) : _data(input.begin(), input.end()) {}
+ void openSource() {}
+ void closeSource() {}
+
bool more() {
return !_data.empty();
}
@@ -145,7 +139,15 @@ private:
std::deque<Data> _data;
};
-/** Returns results in order from a single file */
+/**
+ * Returns results from a sorted range within a file. Each instance is given a file name and start
+ * and end offsets.
+ *
+ * This class is NOT responsible for file clean up / deletion. There are openSource() and
+ * closeSource() functions to ensure the FileIterator is not holding the file open when the file is
+ * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in
+ * use elsewhere.
+ */
template <typename Key, typename Value>
class FileIterator : public SortIteratorInterface<Key, Value> {
public:
@@ -155,48 +157,78 @@ public:
typedef std::pair<Key, Value> Data;
FileIterator(const std::string& fileName,
- const Settings& settings,
- std::shared_ptr<FileDeleter> fileDeleter)
+ unsigned int fileStartOffset,
+ unsigned int fileEndOffset,
+ const Settings& settings)
: _settings(settings),
_done(false),
_fileName(fileName),
- _fileDeleter(fileDeleter),
- _file(_fileName.c_str(), std::ios::in | std::ios::binary) {
- massert(16814,
+ _fileStartOffset(fileStartOffset),
+ _fileEndOffset(fileEndOffset) {
+ uassert(16815,
+ str::stream() << "unexpected empty file: " << _fileName,
+ boost::filesystem::file_size(_fileName) != 0);
+ }
+
+ void openSource() {
+ _file.open(_fileName.c_str(), std::ios::in | std::ios::binary);
+ uassert(16814,
str::stream() << "error opening file \"" << _fileName << "\": "
<< myErrnoWithDescription(),
_file.good());
+ _file.seekg(_fileStartOffset);
+ uassert(50979,
+ str::stream() << "error seeking starting offset of '" << _fileStartOffset
+ << "' in file \""
+ << _fileName
+ << "\": "
+ << myErrnoWithDescription(),
+ _file.good());
+ }
- massert(16815,
- str::stream() << "unexpected empty file: " << _fileName,
- boost::filesystem::file_size(_fileName) != 0);
+ void closeSource() {
+ _file.close();
+ uassert(50969,
+ str::stream() << "error closing file \"" << _fileName << "\": "
+ << myErrnoWithDescription(),
+ !_file.fail());
}
bool more() {
if (!_done)
- fillIfNeeded(); // may change _done
+ fillBufferIfNeeded(); // may change _done
return !_done;
}
Data next() {
verify(!_done);
- fillIfNeeded();
-
- // Note: key must be read before value so can't pass directly to Data constructor
- auto first = Key::deserializeForSorter(*_reader, _settings.first);
- auto second = Value::deserializeForSorter(*_reader, _settings.second);
+ fillBufferIfNeeded();
+
+ // Note: calling read() on the _bufferReader buffer in the deserialize function advances the
+ // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
+ // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
+ // the Data constructor
+ auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
+ auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
return Data(std::move(first), std::move(second));
}
private:
- void fillIfNeeded() {
+ /**
+ * Attempts to refill the _bufferReader if it is empty. Expects _done to be false.
+ */
+ void fillBufferIfNeeded() {
verify(!_done);
- if (!_reader || _reader->atEof())
- fill();
+ if (!_bufferReader || _bufferReader->atEof())
+ fillBufferFromDisk();
}
- void fill() {
+ /**
+ * Tries to read from disk and places any results in _bufferReader. If there is no more data to
+ * read, then _done is set to true and the function returns immediately.
+ */
+ void fillBufferFromDisk() {
int32_t rawSize;
read(&rawSize, sizeof(rawSize));
if (_done)
@@ -208,7 +240,7 @@ private:
_buffer.reset(new char[blockSize]);
read(_buffer.get(), blockSize);
- massert(16816, "file too short?", !_done);
+ uassert(16816, "file too short?", !_done);
auto encryptionHooks = EncryptionHooks::get(getGlobalServiceContext());
if (encryptionHooks->enabled()) {
@@ -220,7 +252,7 @@ private:
reinterpret_cast<uint8_t*>(out.get()),
blockSize,
&outLen);
- massert(28841,
+ uassert(28841,
str::stream() << "Failed to unprotect data: " << status.toString(),
status.isOK());
blockSize = outLen;
@@ -228,70 +260,85 @@ private:
}
if (!compressed) {
- _reader.reset(new BufReader(_buffer.get(), blockSize));
+ _bufferReader.reset(new BufReader(_buffer.get(), blockSize));
return;
}
dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
size_t uncompressedSize;
- massert(17061,
+ uassert(17061,
"couldn't get uncompressed length",
snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
- massert(17062,
+ uassert(17062,
"decompression failed",
snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get()));
// hold on to decompressed data and throw out compressed data at block exit
_buffer.swap(decompressionBuffer);
- _reader.reset(new BufReader(_buffer.get(), uncompressedSize));
+ _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize));
}
- // sets _done to true on EOF - asserts on any other error
+ /**
+ * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset.
+ *
+ * Masserts on any file errors
+ */
void read(void* out, size_t size) {
- _file.read(reinterpret_cast<char*>(out), size);
- if (!_file.good()) {
- if (_file.eof()) {
- _done = true;
- return;
- }
+ invariant(_file.is_open());
- msgasserted(16817,
- str::stream() << "error reading file \"" << _fileName << "\": "
- << myErrnoWithDescription());
+ if (static_cast<unsigned int>(_file.tellg()) >= _fileEndOffset) {
+ invariant(static_cast<unsigned int>(_file.tellg()) == _fileEndOffset);
+ _done = true;
+ return;
}
+
+ _file.read(reinterpret_cast<char*>(out), size);
+ uassert(16817,
+ str::stream() << "error reading file \"" << _fileName << "\": "
+ << myErrnoWithDescription(),
+ _file.good());
verify(_file.gcount() == static_cast<std::streamsize>(size));
}
const Settings _settings;
bool _done;
std::unique_ptr<char[]> _buffer;
- std::unique_ptr<BufReader> _reader;
- std::string _fileName;
- std::shared_ptr<FileDeleter> _fileDeleter; // Must outlive _file
+ std::unique_ptr<BufReader> _bufferReader;
+ std::string _fileName; // File containing the sorted data range.
+ unsigned int _fileStartOffset; // File offset at which the sorted data range starts.
+ unsigned int _fileEndOffset; // File offset at which the sorted data range ends.
std::ifstream _file;
};
-/** Merge-sorts results from 0 or more FileIterators */
+/**
+ * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted
+ * ranges within the same file. This class is given the data source file name upon construction and
+ * is responsible for deleting the data source file upon destruction.
+ */
template <typename Key, typename Value, typename Comparator>
class MergeIterator : public SortIteratorInterface<Key, Value> {
public:
typedef SortIteratorInterface<Key, Value> Input;
typedef std::pair<Key, Value> Data;
-
MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
+ const std::string& itersSourceFileName,
const SortOptions& opts,
const Comparator& comp)
: _opts(opts),
_remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
_first(true),
- _greater(comp) {
+ _greater(comp),
+ _itersSourceFileName(itersSourceFileName) {
for (size_t i = 0; i < iters.size(); i++) {
+ iters[i]->openSource();
if (iters[i]->more()) {
_heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
+ } else {
+ iters[i]->closeSource();
}
}
@@ -306,16 +353,22 @@ public:
_heap.pop_back();
}
+ ~MergeIterator() {
+ // Clear the remaining Stream objects first, to close the file handles before deleting the
+ // file. Some systems will error closing the file if any file handles are still open.
+ _current.reset();
+ _heap.clear();
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_itersSourceFileName));
+ }
+
+ void openSource() {}
+ void closeSource() {}
+
bool more() {
if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
return true;
- // We are done so clean up resources.
- // Can't do this in next() due to lifetime guarantees of unowned Data.
- _heap.clear();
- _current.reset();
_remaining = 0;
-
return false;
}
@@ -331,7 +384,6 @@ public:
if (!_current->advance()) {
verify(!_heap.empty());
-
std::pop_heap(_heap.begin(), _heap.end(), _greater);
_current = _heap.back();
_heap.pop_back();
@@ -346,11 +398,22 @@ public:
private:
- class Stream { // Data + Iterator
+ /**
+ * Data iterator over an Input stream.
+ *
+ * This class is responsible for closing the Input source upon destruction, unfortunately,
+ * because that is the path of least resistence to a design change requiring MergeIterator to
+ * handle eventual deletion of said Input source.
+ */
+ class Stream {
public:
Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
: fileNum(fileNum), _current(first), _rest(rest) {}
+ ~Stream() {
+ _rest->closeSource();
+ }
+
const Data& current() const {
return _current;
}
@@ -396,6 +459,7 @@ private:
std::shared_ptr<Stream> _current;
std::vector<std::shared_ptr<Stream>> _heap; // MinHeap
STLComparator _greater; // named so calls make sense
+ std::string _itersSourceFileName;
};
template <typename Key, typename Value, typename Comparator>
@@ -412,9 +476,22 @@ public:
const Settings& settings = Settings())
: _comp(comp), _settings(settings), _opts(opts), _memUsed(0) {
verify(_opts.limit == 0);
+ if (_opts.extSortAllowed) {
+ _fileName = _opts.tempDir + "/" + nextFileName();
+ }
+ }
+
+ ~NoLimitSorter() {
+ if (!_done) {
+ // If done() was never called to return a MergeIterator, then this Sorter still owns
+ // file deletion.
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
+ }
}
void add(const Key& key, const Value& val) {
+ invariant(!_done);
+
_data.push_back(std::make_pair(key, val));
_memUsed += key.memUsageForSorter();
@@ -425,21 +502,17 @@ public:
}
Iterator* done() {
+ invariant(!_done);
+
if (_iters.empty()) {
sort();
return new InMemIterator<Key, Value>(_data);
}
spill();
- return Iterator::merge(_iters, _opts, _comp);
- }
-
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const {
- return _iters.size();
- }
- size_t memUsed() const {
- return _memUsed;
+ Iterator* mergeIt = Iterator::merge(_iters, _fileName, _opts, _comp);
+ _done = true;
+ return mergeIt;
}
private:
@@ -465,6 +538,8 @@ private:
}
void spill() {
+ invariant(!_done);
+
this->_usedDisk = true;
if (_data.empty())
return;
@@ -484,12 +559,15 @@ private:
sort();
- SortedFileWriter<Key, Value> writer(_opts, _settings);
+ SortedFileWriter<Key, Value> writer(
+ _opts, _fileName, _nextSortedFileWriterOffset, _settings);
for (; !_data.empty(); _data.pop_front()) {
writer.addAlreadySorted(_data.front().first, _data.front().second);
}
+ Iterator* iteratorPtr = writer.done();
+ _nextSortedFileWriterOffset = writer.getFileEndOffset();
- _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+ _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
}
@@ -497,6 +575,9 @@ private:
const Comparator _comp;
const Settings _settings;
SortOptions _opts;
+ std::string _fileName;
+ unsigned int _nextSortedFileWriterOffset = 0;
+ bool _done = false;
size_t _memUsed;
std::deque<Data> _data; // the "current" data
std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
@@ -537,14 +618,6 @@ public:
}
}
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const {
- return 0;
- }
- size_t memUsed() const {
- return _best.first.memUsageForSorter() + _best.second.memUsageForSorter();
- }
-
private:
const Comparator _comp;
Data _best;
@@ -573,6 +646,10 @@ public:
// This also *works* with limit==1 but LimitOneSorter should be used instead
verify(_opts.limit > 1);
+ if (_opts.extSortAllowed) {
+ _fileName = _opts.tempDir + "/" + nextFileName();
+ }
+
// Preallocate a fixed sized vector of the required size if we
// don't expect it to have a major impact on our memory budget.
// This is the common case with small limits.
@@ -581,7 +658,17 @@ public:
}
}
+ ~TopKSorter() {
+ if (!_done) {
+ // If done() was never called to return a MergeIterator, then this Sorter still owns
+ // file deletion.
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_fileName));
+ }
+ }
+
void add(const Key& key, const Value& val) {
+ invariant(!_done);
+
STLComparator less(_comp);
Data contender(key, val);
@@ -631,15 +718,9 @@ public:
}
spill();
- return Iterator::merge(_iters, _opts, _comp);
- }
-
- // TEMP these are here for compatibility. Will be replaced with a general stats API
- int numFiles() const {
- return _iters.size();
- }
- size_t memUsed() const {
- return _memUsed;
+ Iterator* iterator = Iterator::merge(_iters, _fileName, _opts, _comp);
+ _done = true;
+ return iterator;
}
private:
@@ -744,6 +825,8 @@ private:
}
void spill() {
+ invariant(!_done);
+
this->_usedDisk = true;
if (_data.empty())
return;
@@ -767,7 +850,8 @@ private:
sort();
updateCutoff();
- SortedFileWriter<Key, Value> writer(_opts, _settings);
+ SortedFileWriter<Key, Value> writer(
+ _opts, _fileName, _nextSortedFileWriterOffset, _settings);
for (size_t i = 0; i < _data.size(); i++) {
writer.addAlreadySorted(_data[i].first, _data[i].second);
}
@@ -775,7 +859,9 @@ private:
// clear _data and release backing array's memory
std::vector<Data>().swap(_data);
- _iters.push_back(std::shared_ptr<Iterator>(writer.done()));
+ Iterator* iteratorPtr = writer.done();
+ _nextSortedFileWriterOffset = writer.getFileEndOffset();
+ _iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
_memUsed = 0;
}
@@ -783,6 +869,9 @@ private:
const Comparator _comp;
const Settings _settings;
SortOptions _opts;
+ std::string _fileName;
+ unsigned int _nextSortedFileWriterOffset = 0;
+ bool _done = false;
size_t _memUsed;
std::vector<Data> _data; // the "current" data. Organized as max-heap if size == limit.
std::vector<std::shared_ptr<Iterator>> _iters; // data that has already been spilled
@@ -796,46 +885,44 @@ private:
size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far.
};
-inline unsigned nextFileNumber() {
- // This is unified across all Sorter types and instances.
- static AtomicUInt32 fileCounter;
- return fileCounter.fetchAndAdd(1);
-}
} // namespace sorter
//
// SortedFileWriter
//
-
template <typename Key, typename Value>
-SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts, const Settings& settings)
+SortedFileWriter<Key, Value>::SortedFileWriter(const SortOptions& opts,
+ const std::string& fileName,
+ const unsigned int fileStartOffset,
+ const Settings& settings)
: _settings(settings) {
namespace str = mongoutils::str;
// This should be checked by consumers, but if we get here don't allow writes.
- massert(
+ uassert(
16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
- massert(17148,
+ uassert(17148,
"Attempting to use external sort without setting SortOptions::tempDir",
!opts.tempDir.empty());
- {
- StringBuilder sb;
- sb << opts.tempDir << "/extsort." << sorter::nextFileNumber();
- _fileName = sb.str();
- }
-
boost::filesystem::create_directories(opts.tempDir);
- _file.open(_fileName.c_str(), std::ios::binary | std::ios::out);
- massert(16818,
+ _fileName = fileName;
+
+ // We open the provided file in append mode so that SortedFileWriter instances can share the
+ // same file, used serially. We want to share files in order to stay below system open file
+ // limits.
+ _file.open(_fileName.c_str(), std::ios::binary | std::ios::app | std::ios::out);
+ uassert(16818,
str::stream() << "error opening file \"" << _fileName << "\": "
<< sorter::myErrnoWithDescription(),
_file.good());
-
- _fileDeleter = std::make_shared<sorter::FileDeleter>(_fileName);
+ // The file descriptor is positioned at the end of a file when opened in append mode, but
+ // _file.tellp() is not initialized on all systems to reflect this. Therefore, we must also pass
+ // in the expected offset to this constructor.
+ _fileStartOffset = fileStartOffset;
// throw on failure
_file.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
@@ -881,7 +968,7 @@ void SortedFileWriter<Key, Value>::spill() {
reinterpret_cast<uint8_t*>(out.get()),
protectedSizeMax,
&resultLen);
- massert(28842,
+ uassert(28842,
str::stream() << "Failed to compress data: " << status.toString(),
status.isOK());
outBuffer = out.get();
@@ -893,7 +980,6 @@ void SortedFileWriter<Key, Value>::spill() {
try {
_file.write(reinterpret_cast<const char*>(&size), sizeof(size));
_file.write(outBuffer, std::abs(size));
-
} catch (const std::exception&) {
msgasserted(16821,
str::stream() << "error writing to file \"" << _fileName << "\": "
@@ -906,8 +992,22 @@ void SortedFileWriter<Key, Value>::spill() {
template <typename Key, typename Value>
SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
spill();
+ long currentFileOffset = _file.tellp();
+ uassert(50980,
+ str::stream() << "error fetching current file descriptor offset in file \"" << _fileName
+ << "\": "
+ << sorter::myErrnoWithDescription(),
+ currentFileOffset >= 0);
+
+ // In case nothing was written to disk, use _fileStartOffset because tellp() may not be
+ // initialized on all systems upon opening the file.
+ _fileEndOffset = static_cast<unsigned int>(currentFileOffset) < _fileStartOffset
+ ? _fileStartOffset
+ : static_cast<unsigned int>(currentFileOffset);
_file.close();
- return new sorter::FileIterator<Key, Value>(_fileName, _settings, _fileDeleter);
+
+ return new sorter::FileIterator<Key, Value>(
+ _fileName, _fileStartOffset, _fileEndOffset, _settings);
}
//
@@ -918,9 +1018,10 @@ template <typename Key, typename Value>
template <typename Comparator>
SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const std::string& fileName,
const SortOptions& opts,
const Comparator& comp) {
- return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp);
+ return new sorter::MergeIterator<Key, Value, Comparator>(iters, fileName, opts, comp);
}
template <typename Key, typename Value>
@@ -929,11 +1030,11 @@ Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
const Comparator& comp,
const Settings& settings) {
// This should be checked by consumers, but if it isn't try to fail early.
- massert(16947,
+ uassert(16947,
"Attempting to use external sort from mongos. This is not allowed.",
!(isMongos() && opts.extSortAllowed));
- massert(17149,
+ uassert(17149,
"Attempting to use external sort without setting SortOptions::tempDir",
!(opts.extSortAllowed && opts.tempDir.empty()));
switch (opts.limit) {
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 2a4dbe3db32..9a45f1349f8 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -86,24 +86,28 @@
*/
namespace mongo {
-namespace sorter {
-// Everything in this namespace is internal to the sorter
-class FileDeleter;
-}
/**
* Runtime options that control the Sorter's behavior
*/
struct SortOptions {
- unsigned long long limit; /// number of KV pairs to be returned. 0 for no limit.
- size_t maxMemoryUsageBytes; /// Approximate.
- bool extSortAllowed; /// If false, uassert if more mem needed than allowed.
- std::string tempDir; /// Directory to directly place files in.
- /// Must be explicitly set if extSortAllowed is true.
+ // The number of KV pairs to be returned. 0 indicates no limit.
+ unsigned long long limit;
+
+ // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate.
+ size_t maxMemoryUsageBytes;
+
+ // Whether we are allowed to spill to disk. If this is false and in-memory exceeds
+ // maxMemoryUsageBytes, we will uassert.
+ bool extSortAllowed;
+
+ // Directory into which we place a file when spilling to disk. Must be explicitly set if
+ // extSortAllowed is true.
+ std::string tempDir;
SortOptions() : limit(0), maxMemoryUsageBytes(64 * 1024 * 1024), extSortAllowed(false) {}
- /// Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true)
+ // Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true)
SortOptions& Limit(unsigned long long newLimit) {
limit = newLimit;
@@ -126,7 +130,9 @@ struct SortOptions {
}
};
-/// This is the output from the sorting framework
+/**
+ * This is the sorted output iterator from the sorting framework.
+ */
template <typename Key, typename Value>
class SortIteratorInterface {
MONGO_DISALLOW_COPYING(SortIteratorInterface);
@@ -141,18 +147,37 @@ public:
virtual ~SortIteratorInterface() {}
- /// Returns an iterator that merges the passed in iterators
+ // Returns an iterator that merges the passed in iterators
template <typename Comparator>
static SortIteratorInterface* merge(
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const std::string& fileName,
const SortOptions& opts,
const Comparator& comp);
+ // Opens and closes the source of data over which this class iterates, if applicable.
+ virtual void openSource() = 0;
+ virtual void closeSource() = 0;
+
protected:
SortIteratorInterface() {} // can only be constructed as a base
};
-/// This is the main way to input data to the sorting framework
+/**
+ * This is the way to input data to the sorting framework.
+ *
+ * Each instance of this class will generate a file name and spill sorted data ranges to that file
+ * if allowed in its given Settings. If the instance destructs before done() is called, it will
+ * handle deleting the data file used for spills. Otherwise, if done() is called, responsibility for
+ * file deletion moves to the returned Iterator object, which must then delete the file upon its own
+ * destruction.
+ *
+ * All users of Sorter implementations must define their own nextFileName() function to generate
+ * unique file names for spills to disk. This is necessary because the sorter.cpp file is separately
+ * directly included in multiple places, rather than compiled in one place and linked, and so cannot
+ * itself provide a globally unique ID for file names. See existing function implementations of
+ * nextFileName() for example.
+ */
template <typename Key, typename Value>
class Sorter {
MONGO_DISALLOW_COPYING(Sorter);
@@ -170,23 +195,29 @@ public:
const Settings& settings = Settings());
virtual void add(const Key&, const Value&) = 0;
- virtual Iterator* done() = 0; /// Can't add more data after calling done()
+
+ /**
+ * Cannot add more data after calling done().
+ *
+ * The returned Iterator must not outlive the Sorter instance, which manages file clean up.
+ */
+ virtual Iterator* done() = 0;
virtual ~Sorter() {}
- // TEMP these are here for compatibility. Will be replaced with a general stats API
bool usedDisk() {
return _usedDisk;
}
- virtual int numFiles() const = 0;
- virtual size_t memUsed() const = 0;
protected:
bool _usedDisk{false}; // Keeps track of whether the sorter used disk or not
Sorter() {} // can only be constructed as a base
};
-/// Writes pre-sorted data to a sorted file and hands-back an Iterator over that file.
+/**
+ * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file
+ * range.
+ */
template <typename Key, typename Value>
class SortedFileWriter {
MONGO_DISALLOW_COPYING(SortedFileWriter);
@@ -197,19 +228,42 @@ public:
typename Value::SorterDeserializeSettings>
Settings;
- explicit SortedFileWriter(const SortOptions& opts, const Settings& settings = Settings());
+ explicit SortedFileWriter(const SortOptions& opts,
+ const std::string& fileName,
+ const unsigned int fileStartOffset,
+ const Settings& settings = Settings());
void addAlreadySorted(const Key&, const Value&);
- Iterator* done(); /// Can't add more data after calling done()
+
+ /**
+ * Spills any data remaining in the buffer to disk and then closes the file to which data was
+ * written.
+ *
+ * No more data can be added via addAlreadySorted() after calling done().
+ */
+ Iterator* done();
+
+ /**
+ * Only call this after done() has been called to set the end offset.
+ */
+ unsigned int getFileEndOffset() {
+ invariant(!_file.is_open());
+ return _fileEndOffset;
+ }
private:
void spill();
const Settings _settings;
std::string _fileName;
- std::shared_ptr<sorter::FileDeleter> _fileDeleter; // Must outlive _file
std::ofstream _file;
BufBuilder _buffer;
+
+ // Tracks where in the file we started and finished writing the sorted data range so that the
+ // information can be given to the Iterator in done(), and to the user via getFileEndOffset()
+ // for the next SortedFileWriter instance using the same file.
+ unsigned int _fileStartOffset;
+ unsigned int _fileEndOffset;
};
}
@@ -233,6 +287,7 @@ private:
template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \
SortIteratorInterface<Key, Value>::merge<Comparator>( \
const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \
+ const std::string& fileName, \
const SortOptions& opts, \
const Comparator& comp); \
template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index 449b26daaff..a154912ecce 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -45,12 +45,31 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
+#include <memory>
+
+namespace mongo {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicUInt32 sorterTestFileCounter;
+ return "extsort-sorter-test." + std::to_string(sorterTestFileCounter.fetchAndAdd(1));
+}
+
+} // namespace mongo
+
// Need access to internal classes
#include "mongo/db/sorter/sorter.cpp"
-#include <memory>
-
namespace mongo {
+
using namespace mongo::sorter;
using std::make_shared;
using std::pair;
@@ -109,6 +128,8 @@ class IntIterator : public IWIterator {
public:
IntIterator(int start = 0, int stop = INT_MAX, int increment = 1)
: _current(start), _increment(increment), _stop(stop) {}
+ void openSource() {}
+ void closeSource() {}
bool more() {
if (_increment == 0)
return true;
@@ -130,6 +151,8 @@ private:
class EmptyIterator : public IWIterator {
public:
+ void openSource() {}
+ void closeSource() {}
bool more() {
return false;
}
@@ -145,6 +168,9 @@ public:
verify(limit > 0);
}
+ void openSource() {}
+ void closeSource() {}
+
bool more() {
return _remaining && _source->more();
}
@@ -163,6 +189,8 @@ template <typename It1, typename It2>
void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) {
int iteration;
try {
+ it1->openSource();
+ it2->openSource();
for (iteration = 0; true; iteration++) {
ASSERT_EQUALS(it1->more(), it2->more());
ASSERT_EQUALS(it1->more(), it2->more()); // make sure more() is safe to call twice
@@ -174,10 +202,13 @@ void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) {
ASSERT_EQUALS(pair1.first, pair2.first);
ASSERT_EQUALS(pair1.second, pair2.second);
}
-
+ it1->closeSource();
+ it2->closeSource();
} catch (...) {
mongo::unittest::log() << "Failure from line " << line << " on iteration " << iteration
<< std::endl;
+ it1->closeSource();
+ it2->closeSource();
throw;
}
}
@@ -195,10 +226,11 @@ template <typename IteratorPtr, int N>
std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N],
Direction Dir = ASC,
const SortOptions& opts = SortOptions()) {
+ invariant(!opts.extSortAllowed);
std::vector<std::shared_ptr<IWIterator>> vec;
for (int i = 0; i < N; i++)
vec.push_back(std::shared_ptr<IWIterator>(array[i]));
- return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir)));
+ return std::shared_ptr<IWIterator>(IWIterator::merge(vec, "", opts, IWComparator(Dir)));
}
//
@@ -225,6 +257,8 @@ public:
class UnsortedIter : public IWIterator {
public:
UnsortedIter() : _pos(0) {}
+ void openSource() {}
+ void closeSource() {}
bool more() {
return _pos < sizeof(unsorted) / sizeof(unsorted[0]);
}
@@ -248,7 +282,8 @@ public:
unittest::TempDir tempDir("sortedFileWriterTests");
const SortOptions opts = SortOptions().TempDir(tempDir.path());
{ // small
- SortedFileWriter<IntWrapper, IntWrapper> sorter(opts);
+ std::string fileName = opts.tempDir + "/" + nextFileName();
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0);
sorter.addAlreadySorted(0, 0);
sorter.addAlreadySorted(1, -1);
sorter.addAlreadySorted(2, -2);
@@ -256,14 +291,19 @@ public:
sorter.addAlreadySorted(4, -4);
ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
make_shared<IntIterator>(0, 5));
+
+ ASSERT_TRUE(boost::filesystem::remove(fileName));
}
{ // big
- SortedFileWriter<IntWrapper, IntWrapper> sorter(opts);
+ std::string fileName = opts.tempDir + "/" + nextFileName();
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, fileName, 0);
for (int i = 0; i < 10 * 1000 * 1000; i++)
sorter.addAlreadySorted(i, -i);
ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
make_shared<IntIterator>(0, 10 * 1000 * 1000));
+
+ ASSERT_TRUE(boost::filesystem::remove(fileName));
}
ASSERT(boost::filesystem::is_empty(tempDir.path()));
@@ -277,7 +317,7 @@ public:
{ // test empty (no inputs)
std::vector<std::shared_ptr<IWIterator>> vec;
std::shared_ptr<IWIterator> mergeIter(
- IWIterator::merge(vec, SortOptions(), IWComparator()));
+ IWIterator::merge(vec, "", SortOptions(), IWComparator()));
ASSERT_ITERATORS_EQUIVALENT(mergeIter, make_shared<EmptyIterator>());
}
{ // test empty (only empty inputs)
@@ -489,12 +529,6 @@ public:
void addData(unowned_ptr<IWSorter> sorter) {
for (int i = 0; i < NUM_ITEMS; i++)
sorter->add(_array[i], -_array[i]);
-
- if (typeid(*this) == typeid(LotsOfDataLittleMemory)) {
- // don't do this check in subclasses since they may set a limit
- ASSERT_GREATER_THAN_OR_EQUALS(static_cast<size_t>(sorter->numFiles()),
- (NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT);
- }
}
virtual std::shared_ptr<IWIterator> correct() {
diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
index f9fb4d8317e..4e80684b74a 100644
--- a/src/mongo/s/query/cluster_aggregation_planner_test.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
@@ -73,6 +73,7 @@ public:
_expCtx = new ExpressionContextForTest(operationContext(),
AggregationRequest{kTestAggregateNss, {}});
_expCtx->mongoProcessInterface = std::make_shared<FakeMongoProcessInterface>();
+ _expCtx->inMongos = true;
}
boost::intrusive_ptr<ExpressionContext> expCtx() {