summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuhong Zhang <danielzhangyh@gmail.com>2021-12-29 20:53:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-30 04:09:14 +0000
commit23166382f8f7d7c9ed322e590e5020d37a29b540 (patch)
tree5aca5561e63a12381996333d592bfae03548ee67
parent91d9b5a0d92784178c1f01e6a5766e6366f1c797 (diff)
downloadmongo-23166382f8f7d7c9ed322e590e5020d37a29b540.tar.gz
Revert "SERVER-62056 Improve `Sorter` code structure"
This reverts commit 4cfcc10775e2cab05a6e30c2516994ab67b9bd7d.
-rw-r--r--src/mongo/db/exec/SConscript9
-rw-r--r--src/mongo/db/exec/sbe/SConscript6
-rw-r--r--src/mongo/db/exec/sbe/stages/sort.cpp26
-rw-r--r--src/mongo/db/exec/sbe/stages/sort.h14
-rw-r--r--src/mongo/db/exec/sort_executor.cpp (renamed from src/mongo/db/sorter/null_value.h)52
-rw-r--r--src/mongo/db/exec/sort_executor.h35
-rw-r--r--src/mongo/db/index/SConscript6
-rw-r--r--src/mongo/db/index/index_access_method.cpp82
-rw-r--r--src/mongo/db/index/index_access_method.h5
-rw-r--r--src/mongo/db/pipeline/SConscript6
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h4
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp42
-rw-r--r--src/mongo/db/pipeline/document_source_group.h9
-rw-r--r--src/mongo/db/sorter/SConscript20
-rw-r--r--src/mongo/db/sorter/compression.cpp50
-rw-r--r--src/mongo/db/sorter/compression.h43
-rw-r--r--src/mongo/db/sorter/factory.h68
-rw-r--r--src/mongo/db/sorter/file.cpp143
-rw-r--r--src/mongo/db/sorter/file.h94
-rw-r--r--src/mongo/db/sorter/file_iterator.h197
-rw-r--r--src/mongo/db/sorter/in_mem_iterator.h67
-rw-r--r--src/mongo/db/sorter/limit_one_sorter.h84
-rw-r--r--src/mongo/db/sorter/merge_iterator.h150
-rw-r--r--src/mongo/db/sorter/no_limit_sorter.h129
-rw-r--r--src/mongo/db/sorter/options.h52
-rw-r--r--src/mongo/db/sorter/single_elem_iterator.h65
-rw-r--r--src/mongo/db/sorter/sorted_data_iterator.h71
-rw-r--r--src/mongo/db/sorter/sorted_file_writer.h145
-rw-r--r--src/mongo/db/sorter/sorter.cpp1231
-rw-r--r--src/mongo/db/sorter/sorter.h400
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp425
-rw-r--r--src/mongo/db/sorter/spillable_sorter.h149
-rw-r--r--src/mongo/db/sorter/top_k_sorter.h218
-rw-r--r--src/mongo/db/sorter/util.cpp60
-rw-r--r--src/mongo/db/sorter/util.h85
36 files changed, 2012 insertions, 2263 deletions
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index a52a61b14b1..88655a990db 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -64,20 +64,25 @@ env.Library(
],
)
-env.Library(
+sortExecutorEnv = env.Clone()
+sortExecutorEnv.InjectThirdParty(libraries=['snappy'])
+sortExecutorEnv.Library(
target="sort_executor",
source=[
+ "sort_executor.cpp",
"sort_key_comparator.cpp",
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/query/sort_pattern',
- '$BUILD_DIR/mongo/db/sorter/sorter',
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/third_party/shim_snappy',
'working_set',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/sorter/sorter_idl',
+ ],
)
env.Library(
diff --git a/src/mongo/db/exec/sbe/SConscript b/src/mongo/db/exec/sbe/SConscript
index c9670f9384d..6f26193db8f 100644
--- a/src/mongo/db/exec/sbe/SConscript
+++ b/src/mongo/db/exec/sbe/SConscript
@@ -29,7 +29,9 @@ env.Library(
]
)
-env.Library(
+sbeEnv = env.Clone()
+sbeEnv.InjectThirdParty(libraries=['snappy'])
+sbeEnv.Library(
target='query_sbe',
source=[
'expressions/expression.cpp',
@@ -80,7 +82,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
- '$BUILD_DIR/mongo/db/sorter/sorter',
+ '$BUILD_DIR/mongo/db/sorter/sorter_idl',
]
)
diff --git a/src/mongo/db/exec/sbe/stages/sort.cpp b/src/mongo/db/exec/sbe/stages/sort.cpp
index 42db5e6163a..5acf73afe8d 100644
--- a/src/mongo/db/exec/sbe/stages/sort.cpp
+++ b/src/mongo/db/exec/sbe/stages/sort.cpp
@@ -34,11 +34,18 @@
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/size_estimator.h"
#include "mongo/db/exec/trial_run_tracker.h"
-#include "mongo/db/sorter/factory.h"
-#include "mongo/db/sorter/options.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
#include "mongo/util/str.h"
+namespace {
+std::string nextFileName() {
+ static mongo::AtomicWord<unsigned> sortExecutorFileCounter;
+ return "extsort-sort-sbe." + std::to_string(sortExecutorFileCounter.fetchAndAdd(1));
+}
+} // namespace
+
+#include "mongo/db/sorter/sorter.cpp"
+
namespace mongo {
namespace sbe {
SortStage::SortStage(std::unique_ptr<PlanStage> input,
@@ -113,13 +120,13 @@ value::SlotAccessor* SortStage::getAccessor(CompileCtx& ctx, value::SlotId slot)
}
void SortStage::makeSorter() {
- sorter::Options opts;
- if (_allowDiskUse) {
- opts.tempDir = storageGlobalParams.dbpath + "/_tmp";
- }
+ SortOptions opts;
+ opts.tempDir = storageGlobalParams.dbpath + "/_tmp";
opts.maxMemoryUsageBytes = _specificStats.maxMemoryUsageBytes;
+ opts.extSortAllowed = _allowDiskUse;
opts.limit =
_specificStats.limit != std::numeric_limits<size_t>::max() ? _specificStats.limit : 0;
+ opts.moveSortedDataIntoIterator = true;
auto comp = [&](const SorterData& lhs, const SorterData& rhs) {
auto size = lhs.first.size();
@@ -139,8 +146,7 @@ void SortStage::makeSorter() {
return 0;
};
- _sorter =
- sorter::make<value::MaterializedRow, value::MaterializedRow>("sort-sbe", opts, comp, {});
+ _sorter.reset(Sorter<value::MaterializedRow, value::MaterializedRow>::make(opts, comp, {}));
_mergeIt.reset();
}
@@ -188,7 +194,7 @@ void SortStage::open(bool reOpen) {
vals.reset(idx++, true, cTag, cVal);
}
- _sorter->addOwned(std::move(keys), std::move(vals));
+ _sorter->emplace(std::move(keys), std::move(vals));
if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) {
// If we either hit the maximum number of document to return during the trial run, or
@@ -206,7 +212,7 @@ void SortStage::open(bool reOpen) {
}
_specificStats.totalDataSizeBytes += _sorter->totalDataSizeSorted();
- _mergeIt = _sorter->done();
+ _mergeIt.reset(_sorter->done());
_specificStats.spills += _sorter->numSpills();
_specificStats.keysSorted += _sorter->numSorted();
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
diff --git a/src/mongo/db/exec/sbe/stages/sort.h b/src/mongo/db/exec/sbe/stages/sort.h
index ef2ea38c5a8..2bfc9e1d9fb 100644
--- a/src/mongo/db/exec/sbe/stages/sort.h
+++ b/src/mongo/db/exec/sbe/stages/sort.h
@@ -30,8 +30,13 @@
#pragma once
#include "mongo/db/exec/sbe/stages/stages.h"
-#include "mongo/db/sorter/sorted_data_iterator.h"
-#include "mongo/db/sorter/sorter.h"
+
+namespace mongo {
+template <typename Key, typename Value>
+class SortIteratorInterface;
+template <typename Key, typename Value>
+class Sorter;
+} // namespace mongo
namespace mongo::sbe {
/**
@@ -90,8 +95,7 @@ protected:
private:
void makeSorter();
- using SorterIterator =
- sorter::SortedDataIterator<value::MaterializedRow, value::MaterializedRow>;
+ using SorterIterator = SortIteratorInterface<value::MaterializedRow, value::MaterializedRow>;
using SorterData = std::pair<value::MaterializedRow, value::MaterializedRow>;
const value::SlotVector _obs;
@@ -108,7 +112,7 @@ private:
std::unique_ptr<SorterIterator> _mergeIt;
SorterData _mergeData;
SorterData* _mergeDataIt{&_mergeData};
- std::unique_ptr<sorter::Sorter<value::MaterializedRow, value::MaterializedRow>> _sorter;
+ std::unique_ptr<Sorter<value::MaterializedRow, value::MaterializedRow>> _sorter;
// If provided, used during a trial run to accumulate certain execution stats. Once the trial
// run is complete, this pointer is reset to nullptr.
diff --git a/src/mongo/db/sorter/null_value.h b/src/mongo/db/exec/sort_executor.cpp
index 6f83a902957..f23475b5eac 100644
--- a/src/mongo/db/sorter/null_value.h
+++ b/src/mongo/db/exec/sort_executor.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2021-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,31 +27,37 @@
* it in the license file.
*/
-#pragma once
+#include "mongo/platform/basic.h"
-#include "mongo/bson/util/builder.h"
-#include "mongo/util/bufreader.h"
+#include "mongo/db/exec/sort_executor.h"
-namespace mongo::sorter {
+#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/exec/working_set.h"
+
+namespace mongo {
+namespace {
/**
- * 0-sized dummy object that satisfies Sorter's Key/Value interface.
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
*/
-class NullValue {
-public:
- struct SorterDeserializeSettings {};
-
- void serializeForSorter(BufBuilder&) const {}
-
- static NullValue deserializeForSorter(BufReader&, const SorterDeserializeSettings&) {
- return {};
- }
+std::string nextFileName() {
+ static AtomicWord<unsigned> sortExecutorFileCounter;
+ return "extsort-sort-executor." + std::to_string(sortExecutorFileCounter.fetchAndAdd(1));
+}
+} // namespace
+} // namespace mongo
- int memUsageForSorter() const {
- return 0;
- }
+#include "mongo/db/sorter/sorter.cpp"
- NullValue getOwned() const {
- return {};
- }
-};
-} // namespace mongo::sorter
+MONGO_CREATE_SORTER(mongo::Value,
+ mongo::Document,
+ mongo::SortExecutor<mongo::Document>::Comparator);
+MONGO_CREATE_SORTER(mongo::Value,
+ mongo::SortableWorkingSetMember,
+ mongo::SortExecutor<mongo::SortableWorkingSetMember>::Comparator);
+MONGO_CREATE_SORTER(mongo::Value, mongo::BSONObj, mongo::SortExecutor<mongo::BSONObj>::Comparator);
diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h
index bb094b8f5f5..6509c767e9a 100644
--- a/src/mongo/db/exec/sort_executor.h
+++ b/src/mongo/db/exec/sort_executor.h
@@ -35,7 +35,6 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/query/sort_pattern.h"
-#include "mongo/db/sorter/factory.h"
#include "mongo/db/sorter/sorter.h"
namespace mongo {
@@ -52,7 +51,18 @@ namespace mongo {
template <typename T>
class SortExecutor {
public:
- using DocumentSorter = sorter::Sorter<Value, T>;
+ using DocumentSorter = Sorter<Value, T>;
+ class Comparator {
+ public:
+ Comparator(const SortPattern& sortPattern) : _sortKeyComparator(sortPattern) {}
+ int operator()(const typename DocumentSorter::Data& lhs,
+ const typename DocumentSorter::Data& rhs) const {
+ return _sortKeyComparator(lhs.first, rhs.first);
+ }
+
+ private:
+ SortKeyComparator _sortKeyComparator;
+ };
/**
* If the passed in limit is 0, this is treated as no limit.
@@ -114,7 +124,7 @@ public:
*/
void add(const Value& sortKey, const T& data) {
if (!_sorter) {
- _sorter = sorter::make<Value, T>("sort-executor", makeSortOptions(), _getCompFn());
+ _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
}
_sorter->add(sortKey, data);
}
@@ -125,12 +135,13 @@ public:
void loadingDone() {
// This conditional should only pass if no documents were added to the sorter.
if (!_sorter) {
- _sorter = sorter::make<Value, T>("sort-executor", makeSortOptions(), _getCompFn());
+ _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
}
- _output = _sorter->done();
+ _output.reset(_sorter->done());
_stats.keysSorted += _sorter->numSorted();
_stats.spills += _sorter->numSpills();
_stats.totalDataSizeBytes += _sorter->totalDataSizeSorted();
+ _sorter.reset();
}
/**
@@ -144,7 +155,6 @@ public:
if (!_output->more()) {
_output.reset();
- _sorter.reset();
_isEOF = true;
return false;
}
@@ -166,28 +176,21 @@ public:
}
private:
- sorter::Options makeSortOptions() const {
- sorter::Options opts;
+ SortOptions makeSortOptions() const {
+ SortOptions opts;
if (_stats.limit) {
opts.limit = _stats.limit;
}
opts.maxMemoryUsageBytes = _stats.maxMemoryUsageBytes;
if (_diskUseAllowed) {
+ opts.extSortAllowed = true;
opts.tempDir = _tempDir;
}
return opts;
}
- auto _getCompFn() const {
- return [sortKeyComparator =
- SortKeyComparator{_sortPattern}](const typename DocumentSorter::Data& lhs,
- const typename DocumentSorter::Data& rhs) {
- return sortKeyComparator(lhs.first, rhs.first);
- };
- }
-
const SortPattern _sortPattern;
const std::string _tempDir;
const bool _diskUseAllowed;
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index fd6228cb8b2..9d3e92da6dd 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -113,7 +113,9 @@ env.Library(
],
)
-env.Library(
+serveronlyEnv = env.Clone()
+serveronlyEnv.InjectThirdParty(libraries=['snappy'])
+serveronlyEnv.Library(
target="index_access_method",
source=[
"index_access_method.cpp"
@@ -124,7 +126,7 @@ env.Library(
'$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
- '$BUILD_DIR/mongo/db/sorter/sorter',
+ '$BUILD_DIR/mongo/db/sorter/sorter_idl',
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
'$BUILD_DIR/mongo/db/storage/execution_context',
'$BUILD_DIR/mongo/db/storage/index_entry_comparison',
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 5568a982c33..516fd55a841 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -50,7 +50,6 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/timestamp_block.h"
-#include "mongo/db/sorter/factory.h"
#include "mongo/db/storage/execution_context.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
@@ -83,12 +82,12 @@ bool isMultikeyFromPaths(const MultikeyPaths& multikeyPaths) {
[](const MultikeyComponents& components) { return !components.empty(); });
}
-sorter::Options makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName) {
- sorter::Options options;
- options.tempDir = storageGlobalParams.dbpath + "/_tmp";
- options.maxMemoryUsageBytes = maxMemoryUsageBytes;
- options.dbName = dbName.toString();
- return options;
+SortOptions makeSortOptions(size_t maxMemoryUsageBytes, StringData dbName) {
+ return SortOptions()
+ .TempDir(storageGlobalParams.dbpath + "/_tmp")
+ .ExtSortAllowed()
+ .MaxMemoryUsageBytes(maxMemoryUsageBytes)
+ .DBName(dbName.toString());
}
MultikeyPaths createMultikeyPaths(const std::vector<MultikeyPath>& multikeyPathsVec) {
@@ -102,13 +101,15 @@ MultikeyPaths createMultikeyPaths(const std::vector<MultikeyPath>& multikeyPaths
return multikeyPaths;
}
-auto btreeExternalSortComparison = [](const std::pair<KeyString::Value, sorter::NullValue>& lhs,
- const std::pair<KeyString::Value, sorter::NullValue>& rhs) {
- return lhs.first.compare(rhs.first);
-};
-
} // namespace
+struct BtreeExternalSortComparison {
+ typedef std::pair<KeyString::Value, mongo::NullValue> Data;
+ int operator()(const Data& l, const Data& r) const {
+ return l.first.compare(r.first);
+ }
+};
+
AbstractIndexAccessMethod::AbstractIndexAccessMethod(const IndexCatalogEntry* btreeState,
std::unique_ptr<SortedDataInterface> btree)
: _indexCatalogEntry(btreeState),
@@ -520,7 +521,7 @@ public:
* Inserts all multikey metadata keys cached during the BulkBuilder's lifetime into the
* underlying Sorter, finalizes it, and returns an iterator over the sorted dataset.
*/
- std::unique_ptr<Sorter::Iterator> done() final;
+ Sorter::Iterator* done() final;
int64_t getKeysInserted() const final;
@@ -529,7 +530,7 @@ public:
private:
void _insertMultikeyMetadataKeysIntoSorter();
- std::unique_ptr<Sorter> _makeSorter(
+ Sorter* _makeSorter(
size_t maxMemoryUsageBytes,
StringData dbName,
boost::optional<StringData> fileName = boost::none,
@@ -644,7 +645,7 @@ Status AbstractIndexAccessMethod::BulkBuilderImpl::insert(
}
for (const auto& keyString : *keys) {
- _sorter->add(keyString, sorter::NullValue());
+ _sorter->add(keyString, mongo::NullValue());
++_keysInserted;
}
@@ -663,10 +664,10 @@ bool AbstractIndexAccessMethod::BulkBuilderImpl::isMultikey() const {
return _isMultiKey;
}
-std::unique_ptr<IndexAccessMethod::BulkBuilder::Sorter::Iterator>
+IndexAccessMethod::BulkBuilder::Sorter::Iterator*
AbstractIndexAccessMethod::BulkBuilderImpl::done() {
_insertMultikeyMetadataKeysIntoSorter();
- return _sorter->done(Sorter::Iterator::ReturnPolicy::kCopy);
+ return _sorter->done();
}
int64_t AbstractIndexAccessMethod::BulkBuilderImpl::getKeysInserted() const {
@@ -681,7 +682,7 @@ AbstractIndexAccessMethod::BulkBuilderImpl::persistDataForShutdown() {
void AbstractIndexAccessMethod::BulkBuilderImpl::_insertMultikeyMetadataKeysIntoSorter() {
for (const auto& keyString : _multikeyMetadataKeys) {
- _sorter->add(keyString, sorter::NullValue());
+ _sorter->add(keyString, mongo::NullValue());
++_keysInserted;
}
@@ -693,27 +694,24 @@ void AbstractIndexAccessMethod::BulkBuilderImpl::_insertMultikeyMetadataKeysInto
AbstractIndexAccessMethod::BulkBuilderImpl::Sorter::Settings
AbstractIndexAccessMethod::BulkBuilderImpl::_makeSorterSettings() const {
return std::pair<KeyString::Value::SorterDeserializeSettings,
- sorter::NullValue::SorterDeserializeSettings>(
+ mongo::NullValue::SorterDeserializeSettings>(
{_indexCatalogEntry->accessMethod()->getSortedDataInterface()->getKeyStringVersion()}, {});
}
-std::unique_ptr<AbstractIndexAccessMethod::BulkBuilderImpl::Sorter>
+AbstractIndexAccessMethod::BulkBuilderImpl::Sorter*
AbstractIndexAccessMethod::BulkBuilderImpl::_makeSorter(
size_t maxMemoryUsageBytes,
StringData dbName,
boost::optional<StringData> fileName,
const boost::optional<std::vector<SorterRange>>& ranges) const {
- return fileName ? sorter::makeFromExistingRanges<KeyString::Value, sorter::NullValue>(
- fileName->toString(),
- *ranges,
- makeSortOptions(maxMemoryUsageBytes, dbName),
- btreeExternalSortComparison,
- _makeSorterSettings())
- : sorter::make<KeyString::Value, sorter::NullValue>(
- "index",
- makeSortOptions(maxMemoryUsageBytes, dbName),
- btreeExternalSortComparison,
- _makeSorterSettings());
+ return fileName ? Sorter::makeFromExistingRanges(fileName->toString(),
+ *ranges,
+ makeSortOptions(maxMemoryUsageBytes, dbName),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings())
+ : Sorter::make(makeSortOptions(maxMemoryUsageBytes, dbName),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings());
}
void AbstractIndexAccessMethod::_yieldBulkLoad(OperationContext* opCtx,
@@ -759,7 +757,7 @@ Status AbstractIndexAccessMethod::commitBulk(OperationContext* opCtx,
auto ns = _indexCatalogEntry->getNSSFromCatalog(opCtx);
- auto it = bulk->done();
+ std::unique_ptr<BulkBuilder::Sorter::Iterator> it(bulk->done());
static constexpr char message[] = "Index Build: inserting keys from external sorter into index";
ProgressMeterHolder pm;
@@ -964,6 +962,23 @@ SortedDataInterface* AbstractIndexAccessMethod::getSortedDataInterface() const {
return _newInterface.get();
}
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number. Each name is suffixed with a random number generated at startup, to prevent name
+ * collisions when the index build external sort files are preserved across restarts.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicWord<unsigned> indexAccessMethodFileCounter;
+ static const int64_t randomSuffix = SecureRandom().nextInt64();
+ return str::stream() << "extsort-index." << indexAccessMethodFileCounter.fetchAndAdd(1) << '-'
+ << randomSuffix;
+}
+
Status AbstractIndexAccessMethod::_handleDuplicateKey(OperationContext* opCtx,
const KeyString::Value& dataKey,
const RecordIdHandlerFn& onDuplicateRecord) {
@@ -982,3 +997,6 @@ Status AbstractIndexAccessMethod::_handleDuplicateKey(OperationContext* opCtx,
_descriptor->collation());
}
} // namespace mongo
+
+#include "mongo/db/sorter/sorter.cpp"
+MONGO_CREATE_SORTER(mongo::KeyString::Value, mongo::NullValue, mongo::BtreeExternalSortComparison);
diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h
index e9128ca85bb..8e1e4fa36f8 100644
--- a/src/mongo/db/index/index_access_method.h
+++ b/src/mongo/db/index/index_access_method.h
@@ -40,7 +40,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
-#include "mongo/db/sorter/null_value.h"
#include "mongo/db/sorter/sorter.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/db/yieldable.h"
@@ -238,7 +237,7 @@ public:
class BulkBuilder {
public:
- using Sorter = sorter::Sorter<KeyString::Value, sorter::NullValue>;
+ using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>;
virtual ~BulkBuilder() = default;
@@ -270,7 +269,7 @@ public:
* Inserts all multikey metadata keys cached during the BulkBuilder's lifetime into the
* underlying Sorter, finalizes it, and returns an iterator over the sorted dataset.
*/
- virtual std::unique_ptr<Sorter::Iterator> done() = 0;
+ virtual Sorter::Iterator* done() = 0;
/**
* Returns number of keys inserted using this BulkBuilder.
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b63691a47b7..097f55717b6 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -257,7 +257,9 @@ env.Library(
],
)
-env.Library(
+pipelineEnv = env.Clone()
+pipelineEnv.InjectThirdParty(libraries=['snappy'])
+pipelineEnv.Library(
target='pipeline',
source=[
'document_source.cpp',
@@ -371,7 +373,7 @@ env.Library(
'$BUILD_DIR/mongo/db/mongohasher',
'$BUILD_DIR/mongo/db/query/projection_ast',
'$BUILD_DIR/mongo/db/repl/image_collection_entry',
- '$BUILD_DIR/mongo/db/sorter/sorter',
+ '$BUILD_DIR/mongo/db/sorter/sorter_idl',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
'$BUILD_DIR/mongo/rpc/command_status',
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 71dc075e414..1125589e4f9 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/db/sorter/factory.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
namespace mongo {
@@ -69,6 +68,21 @@ boost::intrusive_ptr<Expression> parseGroupByExpression(
}
}
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicWord<unsigned> documentSourceBucketAutoFileCounter;
+ return "extsort-doc-bucket." +
+ std::to_string(documentSourceBucketAutoFileCounter.fetchAndAdd(1));
+}
+
} // namespace
const char* DocumentSourceBucketAuto::getSourceName() const {
@@ -130,18 +144,19 @@ DepsTracker::State DocumentSourceBucketAuto::getDependencies(DepsTracker* deps)
DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() {
if (!_sorter) {
- sorter::Options opts;
+ SortOptions opts;
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
+ opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
const auto& valueCmp = pExpCtx->getValueComparator();
- auto comparator = [valueCmp](const sorter::Sorter<Value, Document>::Data& lhs,
- const sorter::Sorter<Value, Document>::Data& rhs) {
+ auto comparator = [valueCmp](const Sorter<Value, Document>::Data& lhs,
+ const Sorter<Value, Document>::Data& rhs) {
return valueCmp.compare(lhs.first, rhs.first);
};
- _sorter = sorter::make<Value, Document>("doc-bucket", opts, comparator);
+ _sorter.reset(Sorter<Value, Document>::make(opts, comparator));
}
auto next = pSource->getNext();
@@ -200,12 +215,14 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>&
void DocumentSourceBucketAuto::initalizeBucketIteration() {
// Initialize the iterator on '_sorter'.
invariant(_sorter);
- _sortedInput = _sorter->done();
+ _sortedInput.reset(_sorter->done());
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(pExpCtx->opCtx);
metricsCollector.incrementKeysSorted(_sorter->numSorted());
metricsCollector.incrementSorterSpills(_sorter->numSpills());
+ _sorter.reset();
+
// If there are no buckets, then we don't need to populate anything.
if (_nBuckets == 0) {
return;
@@ -351,7 +368,6 @@ Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) {
void DocumentSourceBucketAuto::doDispose() {
_sortedInput.reset();
- _sorter.reset();
}
Value DocumentSourceBucketAuto::serialize(
@@ -505,3 +521,6 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
}
} // namespace mongo
+
+#include "mongo/db/sorter/sorter.cpp"
+// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index 246cf2dfc62..44bd1e3f8f0 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -158,8 +158,8 @@ private:
*/
Document makeDocument(const Bucket& bucket);
- std::unique_ptr<sorter::Sorter<Value, Document>> _sorter;
- std::unique_ptr<sorter::Sorter<Value, Document>::Iterator> _sortedInput;
+ std::unique_ptr<Sorter<Value, Document>> _sorter;
+ std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput;
std::vector<AccumulationStatement> _accumulatedFields;
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 6f6cd4f3694..73f3a76d81d 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -40,13 +40,29 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/db/sorter/merge_iterator.h"
-#include "mongo/db/sorter/sorted_file_writer.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
#include "mongo/util/destructor_guard.h"
namespace mongo {
+namespace {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicWord<unsigned> documentSourceGroupFileCounter;
+ return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
using boost::intrusive_ptr;
using std::pair;
using std::shared_ptr;
@@ -399,10 +415,10 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
? *maxMemoryUsageBytes
: static_cast<size_t>(internalDocumentSourceGroupMaxMemoryBytes.load())},
// We spill to disk in debug mode, regardless of allowDiskUse, to stress the system.
- _file(!expCtx->inMongos && (expCtx->allowDiskUse || kDebugBuild)
- ? std::make_unique<sorter::File>(expCtx->tempDir + "/" +
- sorter::nextFileName("doc-group"))
- : nullptr),
+ _file(
+ !expCtx->inMongos && (expCtx->allowDiskUse || kDebugBuild)
+ ? std::make_shared<Sorter<Value, Value>::File>(expCtx->tempDir + "/" + nextFileName())
+ : nullptr),
_initialized(false),
_groups(expCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
@@ -625,8 +641,8 @@ MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initi
// We won't be using groups again so free its memory.
_groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
- _sorterIterator = std::make_unique<sorter::MergeIterator<Value, Value>>(
- _sortedFiles, 0, SorterComparator(pExpCtx->getValueComparator()));
+ _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
+ _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
// prepare current to accumulate data
_currentAccumulators.reserve(numAccumulators);
@@ -650,7 +666,7 @@ MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initi
MONGO_UNREACHABLE;
}
-std::unique_ptr<sorter::Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
+shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
_stats.spills++;
vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting
@@ -661,7 +677,7 @@ std::unique_ptr<sorter::Sorter<Value, Value>::Iterator> DocumentSourceGroup::spi
stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator()));
- sorter::SortedFileWriter<Value, Value> writer(_file.get());
+ SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file);
switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i.
case 0: // no values, essentially a distinct
for (size_t i = 0; i < ptrs.size(); i++) {
@@ -698,7 +714,8 @@ std::unique_ptr<sorter::Sorter<Value, Value>::Iterator> DocumentSourceGroup::spi
_memoryTracker.set(accum.fieldName, 0);
}
- return writer.done();
+ Sorter<Value, Value>::Iterator* iteratorPtr = writer.done();
+ return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr);
}
Value DocumentSourceGroup::computeId(const Document& root) {
@@ -883,3 +900,6 @@ size_t DocumentSourceGroup::getMaxMemoryUsageBytes() const {
}
} // namespace mongo
+
+#include "mongo/db/sorter/sorter.cpp"
+// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 0dac3b5ed8e..c9f635af90e 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -37,7 +37,6 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/memory_usage_tracker.h"
#include "mongo/db/pipeline/transformer_interface.h"
-#include "mongo/db/sorter/file.h"
#include "mongo/db/sorter/sorter.h"
namespace mongo {
@@ -234,7 +233,7 @@ private:
* does not exhaust the previous stage before returning, and thus does not maintain as large a
* store of documents at any one time, only an unsorted group can spill to disk.
*/
- std::unique_ptr<sorter::Sorter<Value, Value>::Iterator> spill();
+ std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
/**
* If we ran out of memory, finish all the pending operations so that some memory
@@ -276,7 +275,7 @@ private:
GroupStats _stats;
- std::unique_ptr<sorter::File> _file;
+ std::shared_ptr<Sorter<Value, Value>::File> _file;
std::vector<std::string> _idFieldNames; // used when id is a document
std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
@@ -291,14 +290,14 @@ private:
// definition of equality.
boost::optional<GroupsMap> _groups;
- std::vector<std::unique_ptr<sorter::Sorter<Value, Value>::Iterator>> _sortedFiles;
+ std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles;
bool _spilled;
// Only used when '_spilled' is false.
GroupsMap::iterator groupsIterator;
// Only used when '_spilled' is true.
- std::unique_ptr<sorter::Sorter<Value, Value>::Iterator> _sorterIterator;
+ std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
std::pair<Value, Value> _firstPartOfNextGroup;
diff --git a/src/mongo/db/sorter/SConscript b/src/mongo/db/sorter/SConscript
index 491a2a67d97..26084eca010 100644
--- a/src/mongo/db/sorter/SConscript
+++ b/src/mongo/db/sorter/SConscript
@@ -16,28 +16,8 @@ sorterEnv.CppUnitTest(
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/third_party/shim_snappy',
- 'sorter',
- ],
-)
-
-sorterEnv.Library(
- target='sorter',
- source=[
- 'compression.cpp',
- 'file.cpp',
- 'util.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/idl/idl_parser',
- '$BUILD_DIR/third_party/shim_snappy',
'sorter_idl',
],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/db/storage/encryption_hooks',
- '$BUILD_DIR/mongo/s/is_mongos',
- ],
)
env.Library(
diff --git a/src/mongo/db/sorter/compression.cpp b/src/mongo/db/sorter/compression.cpp
deleted file mode 100644
index cd5e3b07c51..00000000000
--- a/src/mongo/db/sorter/compression.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/sorter/compression.h"
-
-#include <snappy.h>
-
-namespace mongo::sorter {
-bool compress(const char* buffer, std::size_t size, std::string* output) {
- return snappy::Compress(buffer, size, output);
-}
-
-bool decompress(const char* buffer, std::size_t size, char* output) {
- return snappy::RawUncompress(buffer, size, output);
-}
-
-bool getUncompressedSize(const char* buffer, std::size_t size, std::size_t* result) {
- return snappy::GetUncompressedLength(buffer, size, result);
-}
-
-bool isValidCompressedBuffer(const char* buffer, std::size_t size) {
- return snappy::IsValidCompressedBuffer(buffer, size);
-}
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/compression.h b/src/mongo/db/sorter/compression.h
deleted file mode 100644
index 7afe26c137c..00000000000
--- a/src/mongo/db/sorter/compression.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <cstdint>
-#include <string>
-
-namespace mongo::sorter {
-bool compress(const char* buffer, std::size_t size, std::string* output);
-
-bool decompress(const char* buffer, std::size_t size, char* output);
-
-bool getUncompressedSize(const char* buffer, std::size_t size, std::size_t* result);
-
-bool isValidCompressedBuffer(const char* buffer, std::size_t size);
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/factory.h b/src/mongo/db/sorter/factory.h
deleted file mode 100644
index 40b3158f09c..00000000000
--- a/src/mongo/db/sorter/factory.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/limit_one_sorter.h"
-#include "mongo/db/sorter/no_limit_sorter.h"
-#include "mongo/db/sorter/top_k_sorter.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-std::unique_ptr<Sorter<Key, Value>> make(
- StringData name,
- const Options& options,
- const typename Sorter<Key, Value>::CompFn& comp,
- const typename Sorter<Key, Value>::Settings& settings = {}) {
- switch (options.limit) {
- case 0:
- return std::make_unique<NoLimitSorter<Key, Value>>(name, options, comp, settings);
- case 1:
- return std::make_unique<LimitOneSorter<Key, Value>>(comp);
- default:
- return std::make_unique<TopKSorter<Key, Value>>(name, options, comp, settings);
- }
-}
-
-template <typename Key, typename Value>
-std::unique_ptr<Sorter<Key, Value>> makeFromExistingRanges(
- const std::string& fileName,
- const std::vector<SorterRange>& ranges,
- const Options& options,
- const typename Sorter<Key, Value>::CompFn& comp,
- const typename Sorter<Key, Value>::Settings& settings = {}) {
- invariant(options.tempDir);
- invariant(options.limit == 0,
- str::stream() << "Creating a Sorter from existing ranges is only available with the "
- "NoLimitSorter (limit 0), but got limit "
- << options.limit);
-
- return std::make_unique<NoLimitSorter<Key, Value>>(fileName, ranges, options, comp, settings);
-}
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/file.cpp b/src/mongo/db/sorter/file.cpp
deleted file mode 100644
index f32d221c79c..00000000000
--- a/src/mongo/db/sorter/file.cpp
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/sorter/file.h"
-
-#include <boost/filesystem/operations.hpp>
-
-#include "mongo/util/destructor_guard.h"
-
-namespace mongo::sorter {
-namespace {
-// We need to use the "real" errno everywhere, not GetLastError() on Windows.
-std::string getError() {
- int errnoCopy = errno;
- StringBuilder sb;
- sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy);
- return sb.str();
-}
-} // namespace
-
-File::~File() {
- if (_keep) {
- return;
- }
-
- if (_file.is_open()) {
- DESTRUCTOR_GUARD(_file.exceptions(std::ios::failbit));
- DESTRUCTOR_GUARD(_file.close());
- }
-
- DESTRUCTOR_GUARD(boost::filesystem::remove(_path));
-}
-
-void File::read(std::streamoff offset, std::streamsize size, void* out) {
- if (!_file.is_open()) {
- _open();
- }
-
- if (_offset != -1) {
- _file.exceptions(std::ios::goodbit);
- _file.flush();
- _offset = -1;
-
- uassert(5479100,
- str::stream() << "Error flushing file " << _path.string() << ": " << getError(),
- _file);
- }
-
- _file.seekg(offset);
- _file.read(reinterpret_cast<char*>(out), size);
-
- uassert(16817,
- str::stream() << "Error reading file " << _path.string() << ": " << getError(),
- _file);
-
- invariant(_file.gcount() == size,
- str::stream() << "Number of bytes read (" << _file.gcount()
- << ") not equal to expected number (" << size << ")");
-
- uassert(51049,
- str::stream() << "Error reading file " << _path.string() << ": " << getError(),
- _file.tellg() >= 0);
-}
-
-void File::write(const char* data, std::streamsize size) {
- _ensureOpenForWriting();
-
- try {
- _file.write(data, size);
- _offset += size;
- } catch (const std::system_error& ex) {
- if (ex.code() == std::errc::no_space_on_device) {
- uasserted(ErrorCodes::OutOfDiskSpace,
- str::stream() << ex.what() << ": " << _path.string());
- }
- uasserted(5642403,
- str::stream() << "Error writing to file " << _path.string() << ": "
- << getError());
- } catch (const std::exception&) {
- uasserted(16821,
- str::stream() << "Error writing to file " << _path.string() << ": "
- << getError());
- }
-}
-
-std::streamoff File::currentOffset() {
- _ensureOpenForWriting();
- return _offset;
-}
-
-void File::_open() {
- invariant(!_file.is_open());
-
- boost::filesystem::create_directories(_path.parent_path());
-
- // We open the provided file in append mode so that SortedFileWriter instances can share
- // the same file, used serially. We want to share files in order to stay below system
- // open file limits.
- _file.open(_path.string(), std::ios::app | std::ios::binary | std::ios::in | std::ios::out);
-
- uassert(16818,
- str::stream() << "Error opening file " << _path.string() << ": " << getError(),
- _file.good());
-}
-
-void File::_ensureOpenForWriting() {
- invariant(_offset != -1 || !_file.is_open());
-
- if (_file.is_open()) {
- return;
- }
-
- _open();
- _file.exceptions(std::ios::failbit | std::ios::badbit);
- _offset = boost::filesystem::file_size(_path);
-}
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/file.h b/src/mongo/db/sorter/file.h
deleted file mode 100644
index e47bf353bef..00000000000
--- a/src/mongo/db/sorter/file.h
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/filesystem/path.hpp>
-#include <fstream>
-#include <string>
-#include <utility>
-
-#include "mongo/util/assert_util.h"
-
-namespace mongo::sorter {
-/**
- * Represents the file that a Sorter uses to spill to disk. Supports reading after writing (or
- * reading without any writing), but does not support writing after any reading has been done.
- */
-class File {
-public:
- File(std::string path) : _path(std::move(path)) {
- invariant(!_path.empty());
- }
-
- ~File();
-
- const boost::filesystem::path& path() const {
- return _path;
- }
-
- /**
- * Signals that the on-disk file should not be cleaned up.
- */
- void keep() {
- _keep = true;
- };
-
- /**
- * Reads the requested data from the file. Cannot write more to the file once this has been
- * called.
- */
- void read(std::streamoff offset, std::streamsize size, void* out);
-
- /**
- * Writes the given data to the end of the file. Cannot be called after reading.
- */
- void write(const char* data, std::streamsize size);
-
- /**
- * Returns the current offset of the end of the file. Cannot be called after reading.
- */
- std::streamoff currentOffset();
-
-private:
- void _open();
-
- void _ensureOpenForWriting();
-
- boost::filesystem::path _path;
- std::fstream _file;
-
- // The current offset of the end of the file, or -1 if the file either has not yet been
- // opened or is already being read.
- std::streamoff _offset = -1;
-
- // Whether to keep the on-disk file even after this in-memory object has been destructed.
- bool _keep = false;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/file_iterator.h b/src/mongo/db/sorter/file_iterator.h
deleted file mode 100644
index 3e740584353..00000000000
--- a/src/mongo/db/sorter/file_iterator.h
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/sorted_data_iterator.h"
-
-#include "mongo/db/sorter/compression.h"
-#include "mongo/db/sorter/file.h"
-#include "mongo/db/sorter/util.h"
-
-namespace mongo::sorter {
-/**
- * Iterates over a sorted range within a file.
- */
-template <typename Key, typename Value>
-class FileIterator : public SortedDataIterator<Key, Value> {
-public:
- using Base = SortedDataIterator<Key, Value>;
- using Data = typename Base::Data;
- using Settings = typename Base::Settings;
-
- FileIterator(File* file,
- std::streamoff fileStartOffset,
- std::streamoff fileEndOffset,
- const uint32_t checksum,
- const Settings& settings,
- const boost::optional<std::string>& dbName)
- : _settings(settings),
- _file(file),
- _fileStartOffset(fileStartOffset),
- _fileCurrentOffset(fileStartOffset),
- _fileEndOffset(fileEndOffset),
- _originalChecksum(checksum),
- _dbName(dbName) {}
-
- ~FileIterator() {
- // If the file iterator reads through all data objects, we can ensure non-corrupt data by
- // comparing the newly calculated checksum with the original checksum from the data written
- // to disk. Some iterators do not read back all data from the file, which prohibits the
- // _afterReadChecksum from obtaining all the information needed. Thus, we only fassert if
- // all data that was written to disk is read back and the checksums are not equivalent.
- if (!more() && _bufferReader->atEof() && (_originalChecksum != _afterReadChecksum)) {
- fassert(31182,
- Status(ErrorCodes::Error::ChecksumMismatch,
- "Data read from disk does not match what was written to disk. Possible "
- "corruption of data."));
- }
- }
-
- bool more() const {
- return !_bufferReader || !_bufferReader->atEof() || _fileCurrentOffset < _fileEndOffset;
- }
-
- Data next() {
- if (!_bufferReader || _bufferReader->atEof()) {
- _fillBuffer();
- }
-
- const char* startOfNewData = static_cast<const char*>(_bufferReader->pos());
-
- // Note: calling read() on the _bufferReader buffer in the deserialize function advances the
- // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
- // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
- // the Data constructor
- auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
- auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
-
- // The difference of _bufferReader's position before and after reading the data
- // will provide the length of the data that was just read.
- const char* endOfNewData = static_cast<const char*>(_bufferReader->pos());
-
- _afterReadChecksum =
- addDataToChecksum(startOfNewData, endOfNewData - startOfNewData, _afterReadChecksum);
-
- return {std::move(first), std::move(second)};
- }
-
- SorterRange getRange() const override {
- return {_fileStartOffset, _fileEndOffset, _originalChecksum};
- }
-
-private:
- /**
- * Fills the buffer by reading from disk.
- */
- void _fillBuffer() {
- int32_t rawSize;
- _read(&rawSize, sizeof(rawSize));
-
- // Negative size means compressed.
- const bool compressed = rawSize < 0;
- int32_t blockSize = std::abs(rawSize);
-
- _buffer.reset(new char[blockSize]);
- _read(_buffer.get(), blockSize);
-
- if (auto encryptionHooks = getEncryptionHooksIfEnabled()) {
- std::unique_ptr<char[]> out(new char[blockSize]);
- size_t outLen;
- Status status =
- encryptionHooks->unprotectTmpData(reinterpret_cast<const uint8_t*>(_buffer.get()),
- blockSize,
- reinterpret_cast<uint8_t*>(out.get()),
- blockSize,
- &outLen,
- _dbName);
- uassert(28841,
- str::stream() << "Failed to unprotect data: " << status.toString(),
- status.isOK());
- blockSize = outLen;
- _buffer.swap(out);
- }
-
- if (!compressed) {
- _bufferReader.reset(new BufReader(_buffer.get(), blockSize));
- return;
- }
-
- dassert(isValidCompressedBuffer(_buffer.get(), blockSize));
-
- size_t uncompressedSize;
- uassert(17061,
- "Failed to get uncompressed size",
- getUncompressedSize(_buffer.get(), blockSize, &uncompressedSize));
-
- std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
- uassert(17062,
- "Failed to decompress",
- decompress(_buffer.get(), blockSize, decompressionBuffer.get()));
-
- // Hold on to decompressed data and throw out compressed data at block exit.
- _buffer.swap(decompressionBuffer);
- _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize));
- }
-
- /**
- * Reads data from disk.
- */
- void _read(void* out, size_t size) {
- invariant(_fileCurrentOffset < _fileEndOffset,
- str::stream() << "Current file offset (" << _fileCurrentOffset
- << ") greater than end offset (" << _fileEndOffset << ")");
-
- _file->read(_fileCurrentOffset, size, out);
- _fileCurrentOffset += size;
- }
-
- const Settings _settings;
-
- std::unique_ptr<char[]> _buffer;
- std::unique_ptr<BufReader> _bufferReader;
-
- File* _file; // File containing the sorted data range.
- std::streamoff _fileStartOffset; // File offset at which the sorted data range starts.
- std::streamoff _fileCurrentOffset; // File offset at which we are currently reading from.
- std::streamoff _fileEndOffset; // File offset at which the sorted data range ends.
-
- // Checksum value retrieved from SortedFileWriter that was calculated as data was spilled
- // to disk. This is not modified, and is only used for comparison against _afterReadChecksum
- // when the FileIterator is exhausted to ensure no data corruption.
- const uint32_t _originalChecksum;
-
- // Checksum value that is updated with each read of a data object from disk. We can compare
- // this value with _originalChecksum to check for data corruption if and only if the
- // FileIterator is exhausted.
- uint32_t _afterReadChecksum = 0;
-
- boost::optional<std::string> _dbName;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/in_mem_iterator.h b/src/mongo/db/sorter/in_mem_iterator.h
deleted file mode 100644
index 03f1ed98dd7..00000000000
--- a/src/mongo/db/sorter/in_mem_iterator.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/sorted_data_iterator.h"
-
-namespace mongo::sorter {
-/**
- * Returns results from sorted in-memory storage.
- */
-template <typename Key, typename Value>
-class InMemIterator : public SortedDataIterator<Key, Value> {
-public:
- using Base = SortedDataIterator<Key, Value>;
- using Data = typename Base::Data;
-
- using Base::_returnPolicy;
-
- InMemIterator(std::vector<Data>& data, typename Base::ReturnPolicy returnPolicy)
- : Base(returnPolicy), _it(data.begin()), _end(data.end()) {}
-
- bool more() const {
- return _it != _end;
- }
-
- Data next() {
- switch (_returnPolicy) {
- case Base::ReturnPolicy::kCopy:
- return *_it++;
- case Base::ReturnPolicy::kMove:
- return std::move(*_it++);
- }
- MONGO_UNREACHABLE;
- }
-
-private:
- typename std::vector<Data>::iterator _it;
- typename std::vector<Data>::iterator _end;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/limit_one_sorter.h b/src/mongo/db/sorter/limit_one_sorter.h
deleted file mode 100644
index d16c829a59c..00000000000
--- a/src/mongo/db/sorter/limit_one_sorter.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/sorter.h"
-
-#include "mongo/db/sorter/single_elem_iterator.h"
-#include "mongo/db/sorter/util.h"
-#include "mongo/util/assert_util.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-class LimitOneSorter : public Sorter<Key, Value> {
-public:
- using Base = Sorter<Key, Value>;
- using Data = typename Base::Data;
- using Iterator = typename Base::Iterator;
- using CompFn = typename Base::CompFn;
-
- using Base::_comp;
- using Base::_done;
- using Base::_numSorted;
- using Base::_totalDataSizeSorted;
-
- explicit LimitOneSorter(const CompFn& comp) : Base(comp) {}
-
- void add(const Key& key, const Value& val) {
- invariant(!_done);
-
- ++_numSorted;
- _totalDataSizeSorted += key.memUsageForSorter() + val.memUsageForSorter();
-
- Data contender{key, val};
-
- if (_haveData) {
- dassertCompIsSane(_comp, _best, contender);
- if (_comp(_best, contender) <= 0) {
- return;
- }
- } else {
- _haveData = true;
- }
-
- _best = {contender.first.getOwned(), contender.second.getOwned()};
- }
-
- std::unique_ptr<Iterator> done(typename Iterator::ReturnPolicy returnPolicy) {
- _done = true;
- return _haveData ? std::make_unique<SingleElemIterator<Key, Value>>(_best, returnPolicy)
- : std::make_unique<SingleElemIterator<Key, Value>>(returnPolicy);
- }
-
-private:
- Data _best;
- bool _haveData = false;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/merge_iterator.h b/src/mongo/db/sorter/merge_iterator.h
deleted file mode 100644
index d09b15185e0..00000000000
--- a/src/mongo/db/sorter/merge_iterator.h
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/sorted_data_iterator.h"
-#include "mongo/db/sorter/util.h"
-
-namespace mongo::sorter {
-/**
- * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted
- * ranges within the same file.
- */
-template <typename Key, typename Value>
-class MergeIterator : public SortedDataIterator<Key, Value> {
-public:
- using Base = SortedDataIterator<Key, Value>;
- using Data = typename Base::Data;
- using CompFn = std::function<int(const Data&, const Data&)>;
-
- MergeIterator(const std::vector<std::unique_ptr<Base>>& iters,
- unsigned long long limit,
- const CompFn& comp)
- : _remaining(limit ? limit : std::numeric_limits<unsigned long long>::max()),
- _greater([comp](const std::unique_ptr<Stream>& lhs, const std::unique_ptr<Stream>& rhs) {
- dassertCompIsSane(comp, lhs->peek(), rhs->peek());
- int result = comp(lhs->peek(), rhs->peek());
- return result ? result > 0 : lhs->num() > rhs->num();
- }) {
- for (size_t i = 0; i < iters.size(); ++i) {
- if (iters[i]->more()) {
- _heap.push_back(std::make_unique<Stream>(iters[i].get(), i));
- }
- }
-
- if (_heap.empty()) {
- _remaining = 0;
- return;
- }
-
- std::make_heap(_heap.begin(), _heap.end(), _greater);
- std::pop_heap(_heap.begin(), _heap.end(), _greater);
- _current = std::move(_heap.back());
- _heap.pop_back();
- }
-
- ~MergeIterator() {
- _current.reset();
- _heap.clear();
- }
-
- bool more() const {
- return _remaining > 0 && (_first || !_heap.empty() || _current->more());
- }
-
- Data next() {
- invariant(more());
-
- --_remaining;
-
- if (_first) {
- _first = false;
- return _current->current();
- }
-
- if (!_current->advance()) {
- invariant(!_heap.empty());
- std::pop_heap(_heap.begin(), _heap.end(), _greater);
- _current = std::move(_heap.back());
- _heap.pop_back();
- } else if (!_heap.empty() && _greater(_current, _heap.front())) {
- std::pop_heap(_heap.begin(), _heap.end(), _greater);
- _current.swap(_heap.back());
- std::push_heap(_heap.begin(), _heap.end(), _greater);
- }
-
- return _current->current();
- }
-
-
-private:
- class Stream {
- public:
- Stream(Base* data, size_t num) : _data(data), _current(data->next()), _num(num) {}
-
- const Data& peek() const {
- return _current;
- }
-
- Data current() {
- return std::move(_current);
- }
-
- bool more() const {
- return _data->more();
- }
-
- bool advance() {
- if (!_data->more()) {
- return false;
- }
-
- _current = std::move(_data->next());
- return true;
- }
-
- size_t num() const {
- return _num;
- }
-
- private:
- Base* _data;
- Data _current;
- size_t _num;
- };
-
- unsigned long long _remaining;
- bool _first = true;
- std::unique_ptr<Stream> _current;
- std::vector<std::unique_ptr<Stream>> _heap; // MinHeap
- std::function<bool(const std::unique_ptr<Stream>& lhs, const std::unique_ptr<Stream>& rhs)>
- _greater;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/no_limit_sorter.h b/src/mongo/db/sorter/no_limit_sorter.h
deleted file mode 100644
index 71ee3070ad3..00000000000
--- a/src/mongo/db/sorter/no_limit_sorter.h
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/filesystem.hpp>
-
-#include "mongo/db/sorter/spillable_sorter.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-class NoLimitSorter : public SpillableSorter<Key, Value> {
-public:
- using Base = SpillableSorter<Key, Value>;
- using Data = typename Base::Data;
- using Iterator = typename Base::Iterator;
- using CompFn = typename Base::CompFn;
- using Settings = typename Base::Settings;
-
- using Base::_data;
- using Base::_done;
- using Base::_file;
- using Base::_less;
- using Base::_memUsed;
- using Base::_numSorted;
- using Base::_options;
- using Base::_settings;
- using Base::_spill;
- using Base::_spilled;
- using Base::_totalDataSizeSorted;
-
- NoLimitSorter(StringData name,
- const Options& options,
- const CompFn& comp,
- const Settings& settings)
- : Base(name, options, comp, settings) {
- invariant(options.limit == 0);
- }
-
- NoLimitSorter(const std::string& fileName,
- const std::vector<SorterRange>& ranges,
- const Options& options,
- const CompFn& comp,
- const Settings& settings)
- : Base(options, comp, settings, fileName) {
- uassert(16815,
- str::stream() << "Unexpected empty file: " << _file->path().string(),
- ranges.empty() || boost::filesystem::file_size(_file->path()) != 0);
-
- _spilled.reserve(ranges.size());
- std::transform(ranges.begin(),
- ranges.end(),
- std::back_inserter(_spilled),
- [this](const SorterRange& range) {
- return std::make_unique<FileIterator<Key, Value>>(_file.get(),
- range.getStartOffset(),
- range.getEndOffset(),
- range.getChecksum(),
- _settings,
- _options.dbName);
- });
- }
-
- void add(const Key& key, const Value& value) {
- addOwned(key.getOwned(), value.getOwned());
- }
-
- void addOwned(Key&& key, Value&& value) override {
- invariant(!_done);
-
- ++_numSorted;
-
- auto memUsage = key.memUsageForSorter() + value.memUsageForSorter();
- _memUsed += memUsage;
- _totalDataSizeSorted += memUsage;
-
- _data.emplace_back(std::move(key), std::move(value));
-
- if (_memUsed > _options.maxMemoryUsageBytes) {
- _spill();
- }
- }
-
- typename Base::PersistedState persistDataForShutdown() override {
- _spill();
- _file->keep();
-
- std::vector<SorterRange> ranges;
- ranges.reserve(_spilled.size());
- std::transform(_spilled.begin(),
- _spilled.end(),
- std::back_inserter(ranges),
- [](const auto& it) { return it->getRange(); });
-
- return {_file->path().filename().string(), std::move(ranges)};
- }
-
-private:
- void _sort() {
- std::stable_sort(_data.begin(), _data.end(), _less);
- }
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/options.h b/src/mongo/db/sorter/options.h
deleted file mode 100644
index ba5f261619d..00000000000
--- a/src/mongo/db/sorter/options.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/optional.hpp>
-
-namespace mongo::sorter {
-struct Options {
- // The number of KV pairs to be returned. 0 indicates no limit.
- unsigned long long limit = 0;
-
- // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate.
- size_t maxMemoryUsageBytes = 64 * 1024 * 1024;
-
- // Whether we are allowed to spill to disk. If this is none and in-memory exceeds
- // maxMemoryUsageBytes, we will uassert.
- boost::optional<std::string> tempDir;
-
- // In case the sorter spills encrypted data to disk that must be readable even after process
- // restarts, it must encrypt with a persistent key. This key is accessed using the database
- // name that the sorted collection lives in. If encryption is enabled and dbName is boost::none,
- // a temporary key is used.
- boost::optional<std::string> dbName;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/single_elem_iterator.h b/src/mongo/db/sorter/single_elem_iterator.h
deleted file mode 100644
index 2764630a78a..00000000000
--- a/src/mongo/db/sorter/single_elem_iterator.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/sorted_data_iterator.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-class SingleElemIterator : public SortedDataIterator<Key, Value> {
-public:
- using Base = SortedDataIterator<Key, Value>;
- using Data = typename Base::Data;
-
- using Base::_returnPolicy;
-
- SingleElemIterator(typename Base::ReturnPolicy returnPolicy) : Base(returnPolicy) {}
-
- SingleElemIterator(Data& data, typename Base::ReturnPolicy returnPolicy)
- : Base(returnPolicy), _data(&data) {}
-
- bool more() const {
- return _data;
- }
-
- Data next() {
- switch (_returnPolicy) {
- case Base::ReturnPolicy::kCopy:
- return *std::exchange(_data, nullptr);
- case Base::ReturnPolicy::kMove:
- return std::move(*std::exchange(_data, nullptr));
- }
- MONGO_UNREACHABLE;
- }
-
-private:
- Data* _data = nullptr;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/sorted_data_iterator.h b/src/mongo/db/sorter/sorted_data_iterator.h
deleted file mode 100644
index a63e77ddccf..00000000000
--- a/src/mongo/db/sorter/sorted_data_iterator.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-#include "mongo/db/sorter/options.h"
-#include "mongo/db/sorter/sorter_gen.h"
-#include "mongo/util/assert_util.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-class SortedDataIterator {
-public:
- using Data = std::pair<Key, Value>;
- using Settings = std::pair<typename Key::SorterDeserializeSettings,
- typename Value::SorterDeserializeSettings>;
-
- enum class ReturnPolicy {
- kCopy,
- kMove,
- };
-
- SortedDataIterator(ReturnPolicy returnPolicy = ReturnPolicy::kMove)
- : _returnPolicy(returnPolicy) {}
-
- SortedDataIterator(const SortedDataIterator&) = delete;
- SortedDataIterator& operator=(const SortedDataIterator&) = delete;
-
- virtual ~SortedDataIterator() {}
-
- virtual bool more() const = 0;
-
- virtual Data next() = 0;
-
- virtual SorterRange getRange() const {
- MONGO_UNREACHABLE;
- }
-
-protected:
- ReturnPolicy _returnPolicy;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/sorted_file_writer.h b/src/mongo/db/sorter/sorted_file_writer.h
deleted file mode 100644
index ba82676e49b..00000000000
--- a/src/mongo/db/sorter/sorted_file_writer.h
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/file.h"
-#include "mongo/db/sorter/file_iterator.h"
-#include "mongo/db/sorter/options.h"
-#include "mongo/db/sorter/sorted_data_iterator.h"
-#include "mongo/db/sorter/util.h"
-#include "mongo/s/is_mongos.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-class SortedFileWriter {
-public:
- using Iterator = SortedDataIterator<Key, Value>;
- using Settings = std::pair<typename Key::SorterDeserializeSettings,
- typename Value::SorterDeserializeSettings>;
-
- SortedFileWriter(File* file,
- const boost::optional<std::string>& dbName = boost::none,
- const Settings& settings = Settings())
- : _settings(settings),
- _file(file),
- _fileStartOffset(_file->currentOffset()),
- _dbName(dbName) {
- invariant(!isMongos());
- }
-
- SortedFileWriter(const SortedFileWriter&) = delete;
- SortedFileWriter& operator=(const SortedFileWriter&) = delete;
-
- void addAlreadySorted(const Key& key, const Value& val) {
- // Offset that points to the place in the buffer where a new data object will be stored.
- int nextObjPos = _buffer.len();
-
- // Add serialized key and value to the buffer.
- key.serializeForSorter(_buffer);
- val.serializeForSorter(_buffer);
-
- // Serializing the key and value grows the buffer, but _buffer.buf() still points to the
- // beginning. Use _buffer.len() to determine portion of buffer containing new datum.
- _checksum =
- addDataToChecksum(_buffer.buf() + nextObjPos, _buffer.len() - nextObjPos, _checksum);
-
- if (_buffer.len() > 64 * 1024) {
- _spill();
- }
- }
-
- std::unique_ptr<Iterator> done() {
- _spill();
-
- return std::make_unique<FileIterator<Key, Value>>(
- _file, _fileStartOffset, _file->currentOffset(), _checksum, _settings, _dbName);
- }
-
-private:
- void _spill() {
- int32_t size = _buffer.len();
- char* outBuffer = _buffer.buf();
-
- if (size == 0) {
- return;
- }
-
- std::string compressed;
- compress(outBuffer, size, &compressed);
- invariant(compressed.size() <= size_t(std::numeric_limits<int32_t>::max()));
-
- const bool shouldCompress = compressed.size() < size_t(_buffer.len() / 10 * 9);
- if (shouldCompress) {
- size = compressed.size();
- outBuffer = const_cast<char*>(compressed.data());
- }
-
- std::unique_ptr<char[]> out;
- if (auto encryptionHooks = getEncryptionHooksIfEnabled()) {
- size_t protectedSizeMax = size + encryptionHooks->additionalBytesForProtectedBuffer();
- out.reset(new char[protectedSizeMax]);
- size_t resultLen;
- Status status =
- encryptionHooks->protectTmpData(reinterpret_cast<const uint8_t*>(outBuffer),
- size,
- reinterpret_cast<uint8_t*>(out.get()),
- protectedSizeMax,
- &resultLen,
- _dbName);
- uassert(28842,
- str::stream() << "Failed to compress data: " << status.toString(),
- status.isOK());
- outBuffer = out.get();
- size = resultLen;
- }
-
- // Negative size means compressed.
- size = shouldCompress ? -size : size;
- _file->write(reinterpret_cast<const char*>(&size), sizeof(size));
- _file->write(outBuffer, std::abs(size));
-
- _buffer.reset();
- }
-
- const Settings _settings;
- File* _file;
- BufBuilder _buffer;
-
- // Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator
- // to ensure data has not been corrupted after reading from disk.
- uint32_t _checksum = 0;
-
- // Tracks where in the file we started writing the sorted data range so that the information can
- // be given to the Iterator in done().
- std::streamoff _fileStartOffset;
-
- boost::optional<std::string> _dbName;
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
new file mode 100644
index 00000000000..e8fece6cff2
--- /dev/null
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -0,0 +1,1231 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+/**
+ * This is the implementation for the Sorter.
+ *
+ * It is intended to be included in other cpp files like this:
+ *
+ * #include <normal/include/files.h>
+ *
+ * #include "mongo/db/sorter/sorter.h"
+ *
+ * namespace mongo {
+ * // Your code
+ * }
+ *
+ * #include "mongo/db/sorter/sorter.cpp"
+ * MONGO_CREATE_SORTER(MyKeyType, MyValueType, MyComparatorType);
+ *
+ * Do this once for each unique set of parameters to MONGO_CREATE_SORTER.
+ */
+
+#include "mongo/db/sorter/sorter.h"
+
+#include <boost/filesystem/operations.hpp>
+#include <snappy.h>
+#include <vector>
+
+#include "mongo/base/string_data.h"
+#include "mongo/config.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/encryption_hooks.h"
+#include "mongo/db/storage/storage_options.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/overflow_arithmetic.h"
+#include "mongo/s/is_mongos.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/destructor_guard.h"
+#include "mongo/util/str.h"
+
+namespace mongo {
+
+namespace {
+
+/**
+ * Calculates and returns a new murmur hash value based on the prior murmur hash and a new piece
+ * of data.
+ */
+uint32_t addDataToChecksum(const void* startOfData, size_t sizeOfData, uint32_t checksum) {
+ unsigned newChecksum;
+ MurmurHash3_x86_32(startOfData, sizeOfData, checksum, &newChecksum);
+ return newChecksum;
+}
+
+void checkNoExternalSortOnMongos(const SortOptions& opts) {
+ // This should be checked by consumers, but if it isn't try to fail early.
+ uassert(16947,
+ "Attempting to use external sort from mongos. This is not allowed.",
+ !(isMongos() && opts.extSortAllowed));
+}
+
+/**
+ * Returns the current EncryptionHooks registered with the global service context.
+ * Returns nullptr if the service context is not available; or if the EncyptionHooks
+ * registered is not enabled.
+ */
+EncryptionHooks* getEncryptionHooksIfEnabled() {
+ // Some tests may not run with a global service context.
+ if (!hasGlobalServiceContext()) {
+ return nullptr;
+ }
+ auto service = getGlobalServiceContext();
+ auto encryptionHooks = EncryptionHooks::get(service);
+ if (!encryptionHooks->enabled()) {
+ return nullptr;
+ }
+ return encryptionHooks;
+}
+
+} // namespace
+
+namespace sorter {
+
+// We need to use the "real" errno everywhere, not GetLastError() on Windows
+inline std::string myErrnoWithDescription() {
+ int errnoCopy = errno;
+ StringBuilder sb;
+ sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy);
+ return sb.str();
+}
+
+template <typename Data, typename Comparator>
+void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) {
+#if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER)
+ // MSVC++ already does similar verification in debug mode in addition to using
+ // algorithms that do more comparisons. Doing our own verification in addition makes
+ // debug builds considerably slower without any additional safety.
+
+ // test reversed comparisons
+ const int regular = comp(lhs, rhs);
+ if (regular == 0) {
+ invariant(comp(rhs, lhs) == 0);
+ } else if (regular < 0) {
+ invariant(comp(rhs, lhs) > 0);
+ } else {
+ invariant(comp(rhs, lhs) < 0);
+ }
+
+ // test reflexivity
+ invariant(comp(lhs, lhs) == 0);
+ invariant(comp(rhs, rhs) == 0);
+#endif
+}
+
+/**
+ * Returns results from sorted in-memory storage.
+ */
+template <typename Key, typename Value>
+class InMemIterator : public SortIteratorInterface<Key, Value> {
+public:
+ typedef std::pair<Key, Value> Data;
+
+ /// No data to iterate
+ InMemIterator() {}
+
+ /// Only a single value
+ InMemIterator(const Data& singleValue) : _data(1, singleValue) {}
+
+ /// Any number of values
+ template <typename Container>
+ InMemIterator(const Container& input) : _data(input.begin(), input.end()) {}
+
+ InMemIterator(std::deque<Data> data) : _data(std::move(data)) {}
+
+ void openSource() {}
+ void closeSource() {}
+
+ bool more() {
+ return !_data.empty();
+ }
+ Data next() {
+ Data out = std::move(_data.front());
+ _data.pop_front();
+ return out;
+ }
+
+private:
+ std::deque<Data> _data;
+};
+
+/**
+ * Returns results from a sorted range within a file. Each instance is given a file name and start
+ * and end offsets.
+ *
+ * This class is NOT responsible for file clean up / deletion. There are openSource() and
+ * closeSource() functions to ensure the FileIterator is not holding the file open when the file is
+ * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in
+ * use elsewhere.
+ */
+template <typename Key, typename Value>
+class FileIterator : public SortIteratorInterface<Key, Value> {
+public:
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings>
+ Settings;
+ typedef std::pair<Key, Value> Data;
+
+ FileIterator(std::shared_ptr<typename Sorter<Key, Value>::File> file,
+ std::streamoff fileStartOffset,
+ std::streamoff fileEndOffset,
+ const Settings& settings,
+ const boost::optional<std::string>& dbName,
+ const uint32_t checksum)
+ : _settings(settings),
+ _file(std::move(file)),
+ _fileStartOffset(fileStartOffset),
+ _fileCurrentOffset(fileStartOffset),
+ _fileEndOffset(fileEndOffset),
+ _dbName(dbName),
+ _originalChecksum(checksum) {}
+
+ void openSource() {}
+
+ void closeSource() {
+ // If the file iterator reads through all data objects, we can ensure non-corrupt data
+ // by comparing the newly calculated checksum with the original checksum from the data
+ // written to disk. Some iterators do not read back all data from the file, which prohibits
+ // the _afterReadChecksum from obtaining all the information needed. Thus, we only fassert
+ // if all data that was written to disk is read back and the checksums are not equivalent.
+ if (_done && _bufferReader->atEof() && (_originalChecksum != _afterReadChecksum)) {
+ fassert(31182,
+ Status(ErrorCodes::Error::ChecksumMismatch,
+ "Data read from disk does not match what was written to disk. Possible "
+ "corruption of data."));
+ }
+ }
+
+ bool more() {
+ if (!_done)
+ _fillBufferIfNeeded(); // may change _done
+ return !_done;
+ }
+
+ Data next() {
+ invariant(!_done);
+ _fillBufferIfNeeded();
+
+ const char* startOfNewData = static_cast<const char*>(_bufferReader->pos());
+
+ // Note: calling read() on the _bufferReader buffer in the deserialize function advances the
+ // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function
+ // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into
+ // the Data constructor
+ auto first = Key::deserializeForSorter(*_bufferReader, _settings.first);
+ auto second = Value::deserializeForSorter(*_bufferReader, _settings.second);
+
+ // The difference of _bufferReader's position before and after reading the data
+ // will provide the length of the data that was just read.
+ const char* endOfNewData = static_cast<const char*>(_bufferReader->pos());
+
+ _afterReadChecksum =
+ addDataToChecksum(startOfNewData, endOfNewData - startOfNewData, _afterReadChecksum);
+
+ return Data(std::move(first), std::move(second));
+ }
+
+ SorterRange getRange() const {
+ return {_fileStartOffset, _fileEndOffset, _originalChecksum};
+ }
+
+private:
+ /**
+ * Attempts to refill the _bufferReader if it is empty. Expects _done to be false.
+ */
+ void _fillBufferIfNeeded() {
+ invariant(!_done);
+
+ if (!_bufferReader || _bufferReader->atEof())
+ _fillBufferFromDisk();
+ }
+
+ /**
+ * Tries to read from disk and places any results in _bufferReader. If there is no more data to
+ * read, then _done is set to true and the function returns immediately.
+ */
+ void _fillBufferFromDisk() {
+ int32_t rawSize;
+ _read(&rawSize, sizeof(rawSize));
+ if (_done)
+ return;
+
+ // negative size means compressed
+ const bool compressed = rawSize < 0;
+ int32_t blockSize = std::abs(rawSize);
+
+ _buffer.reset(new char[blockSize]);
+ _read(_buffer.get(), blockSize);
+ uassert(16816, "file too short?", !_done);
+
+ if (auto encryptionHooks = getEncryptionHooksIfEnabled()) {
+ std::unique_ptr<char[]> out(new char[blockSize]);
+ size_t outLen;
+ Status status =
+ encryptionHooks->unprotectTmpData(reinterpret_cast<const uint8_t*>(_buffer.get()),
+ blockSize,
+ reinterpret_cast<uint8_t*>(out.get()),
+ blockSize,
+ &outLen,
+ _dbName);
+ uassert(28841,
+ str::stream() << "Failed to unprotect data: " << status.toString(),
+ status.isOK());
+ blockSize = outLen;
+ _buffer.swap(out);
+ }
+
+ if (!compressed) {
+ _bufferReader.reset(new BufReader(_buffer.get(), blockSize));
+ return;
+ }
+
+ dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize));
+
+ size_t uncompressedSize;
+ uassert(17061,
+ "couldn't get uncompressed length",
+ snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize));
+
+ std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]);
+ uassert(17062,
+ "decompression failed",
+ snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get()));
+
+ // hold on to decompressed data and throw out compressed data at block exit
+ _buffer.swap(decompressionBuffer);
+ _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize));
+ }
+
+ /**
+ * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset.
+ */
+ void _read(void* out, size_t size) {
+ if (_fileCurrentOffset == _fileEndOffset) {
+ _done = true;
+ return;
+ }
+
+ invariant(_fileCurrentOffset < _fileEndOffset,
+ str::stream() << "Current file offset (" << _fileCurrentOffset
+ << ") greater than end offset (" << _fileEndOffset << ")");
+
+ _file->read(_fileCurrentOffset, size, out);
+ _fileCurrentOffset += size;
+ }
+
+ const Settings _settings;
+ bool _done = false;
+
+ std::unique_ptr<char[]> _buffer;
+ std::unique_ptr<BufReader> _bufferReader;
+ std::shared_ptr<typename Sorter<Key, Value>::File>
+ _file; // File containing the sorted data range.
+ std::streamoff _fileStartOffset; // File offset at which the sorted data range starts.
+ std::streamoff _fileCurrentOffset; // File offset at which we are currently reading from.
+ std::streamoff _fileEndOffset; // File offset at which the sorted data range ends.
+ boost::optional<std::string> _dbName;
+
+ // Checksum value that is updated with each read of a data object from disk. We can compare
+ // this value with _originalChecksum to check for data corruption if and only if the
+ // FileIterator is exhausted.
+ uint32_t _afterReadChecksum = 0;
+
+ // Checksum value retrieved from SortedFileWriter that was calculated as data was spilled
+ // to disk. This is not modified, and is only used for comparison against _afterReadChecksum
+ // when the FileIterator is exhausted to ensure no data corruption.
+ const uint32_t _originalChecksum;
+};
+
+/**
+ * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted
+ * ranges within the same file. This class is given the data source file name upon construction and
+ * is responsible for deleting the data source file upon destruction.
+ */
+template <typename Key, typename Value, typename Comparator>
+class MergeIterator : public SortIteratorInterface<Key, Value> {
+public:
+ typedef SortIteratorInterface<Key, Value> Input;
+ typedef std::pair<Key, Value> Data;
+
+ MergeIterator(const std::vector<std::shared_ptr<Input>>& iters,
+ const SortOptions& opts,
+ const Comparator& comp)
+ : _opts(opts),
+ _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()),
+ _first(true),
+ _greater(comp) {
+ for (size_t i = 0; i < iters.size(); i++) {
+ iters[i]->openSource();
+ if (iters[i]->more()) {
+ _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i]));
+ } else {
+ iters[i]->closeSource();
+ }
+ }
+
+ if (_heap.empty()) {
+ _remaining = 0;
+ return;
+ }
+
+ std::make_heap(_heap.begin(), _heap.end(), _greater);
+ std::pop_heap(_heap.begin(), _heap.end(), _greater);
+ _current = _heap.back();
+ _heap.pop_back();
+ }
+
+ ~MergeIterator() {
+ _current.reset();
+ _heap.clear();
+ }
+
+ void openSource() {}
+ void closeSource() {}
+
+ bool more() {
+ if (_remaining > 0 && (_first || !_heap.empty() || _current->more()))
+ return true;
+
+ _remaining = 0;
+ return false;
+ }
+
+ Data next() {
+ verify(_remaining);
+
+ _remaining--;
+
+ if (_first) {
+ _first = false;
+ return _current->current();
+ }
+
+ if (!_current->advance()) {
+ verify(!_heap.empty());
+ std::pop_heap(_heap.begin(), _heap.end(), _greater);
+ _current = _heap.back();
+ _heap.pop_back();
+ } else if (!_heap.empty() && _greater(_current, _heap.front())) {
+ std::pop_heap(_heap.begin(), _heap.end(), _greater);
+ std::swap(_current, _heap.back());
+ std::push_heap(_heap.begin(), _heap.end(), _greater);
+ }
+
+ return _current->current();
+ }
+
+
+private:
+ /**
+ * Data iterator over an Input stream.
+ *
+ * This class is responsible for closing the Input source upon destruction, unfortunately,
+ * because that is the path of least resistence to a design change requiring MergeIterator to
+ * handle eventual deletion of said Input source.
+ */
+ class Stream {
+ public:
+ Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest)
+ : fileNum(fileNum), _current(first), _rest(rest) {}
+
+ ~Stream() {
+ _rest->closeSource();
+ }
+
+ const Data& current() const {
+ return _current;
+ }
+ bool more() {
+ return _rest->more();
+ }
+ bool advance() {
+ if (!_rest->more())
+ return false;
+
+ _current = _rest->next();
+ return true;
+ }
+
+ const size_t fileNum;
+
+ private:
+ Data _current;
+ std::shared_ptr<Input> _rest;
+ };
+
+ class STLComparator { // uses greater rather than less-than to maintain a MinHeap
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+
+ template <typename Ptr>
+ bool operator()(const Ptr& lhs, const Ptr& rhs) const {
+ // first compare data
+ dassertCompIsSane(_comp, lhs->current(), rhs->current());
+ int ret = _comp(lhs->current(), rhs->current());
+ if (ret)
+ return ret > 0;
+
+ // then compare fileNums to ensure stability
+ return lhs->fileNum > rhs->fileNum;
+ }
+
+ private:
+ const Comparator _comp;
+ };
+
+ SortOptions _opts;
+ unsigned long long _remaining;
+ bool _first;
+ std::shared_ptr<Stream> _current;
+ std::vector<std::shared_ptr<Stream>> _heap; // MinHeap
+ STLComparator _greater; // named so calls make sense
+};
+
+template <typename Key, typename Value, typename Comparator>
+class NoLimitSorter : public Sorter<Key, Value> {
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings>
+ Settings;
+
+ NoLimitSorter(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings())
+ : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) {
+ invariant(opts.limit == 0);
+ }
+
+ NoLimitSorter(const std::string& fileName,
+ const std::vector<SorterRange>& ranges,
+ const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings())
+ : Sorter<Key, Value>(opts, fileName), _comp(comp), _settings(settings) {
+ invariant(opts.extSortAllowed);
+
+ uassert(16815,
+ str::stream() << "Unexpected empty file: " << this->_file->path().string(),
+ ranges.empty() || boost::filesystem::file_size(this->_file->path()) != 0);
+
+ this->_iters.reserve(ranges.size());
+ std::transform(ranges.begin(),
+ ranges.end(),
+ std::back_inserter(this->_iters),
+ [this](const SorterRange& range) {
+ return std::make_shared<sorter::FileIterator<Key, Value>>(
+ this->_file,
+ range.getStartOffset(),
+ range.getEndOffset(),
+ this->_settings,
+ this->_opts.dbName,
+ range.getChecksum());
+ });
+ }
+
+ void add(const Key& key, const Value& val) {
+ invariant(!_done);
+
+ _data.emplace_back(key.getOwned(), val.getOwned());
+
+ auto memUsage = key.memUsageForSorter() + val.memUsageForSorter();
+ _memUsed += memUsage;
+ this->_totalDataSizeSorted += memUsage;
+
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
+ spill();
+ }
+
+ void emplace(Key&& key, Value&& val) override {
+ invariant(!_done);
+
+ auto memUsage = key.memUsageForSorter() + val.memUsageForSorter();
+ _memUsed += memUsage;
+ this->_totalDataSizeSorted += memUsage;
+
+ _data.emplace_back(std::move(key), std::move(val));
+
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
+ spill();
+ }
+
+ Iterator* done() {
+ invariant(!std::exchange(_done, true));
+
+ if (this->_iters.empty()) {
+ sort();
+ if (this->_opts.moveSortedDataIntoIterator) {
+ return new InMemIterator<Key, Value>(std::move(_data));
+ }
+ return new InMemIterator<Key, Value>(_data);
+ }
+
+ spill();
+ return Iterator::merge(this->_iters, this->_opts, _comp);
+ }
+
+private:
+ class STLComparator {
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+ bool operator()(const Data& lhs, const Data& rhs) const {
+ dassertCompIsSane(_comp, lhs, rhs);
+ return _comp(lhs, rhs) < 0;
+ }
+
+ private:
+ const Comparator& _comp;
+ };
+
+ void sort() {
+ STLComparator less(_comp);
+ std::stable_sort(_data.begin(), _data.end(), less);
+ this->_numSorted += _data.size();
+ }
+
+ void spill() {
+ if (_data.empty())
+ return;
+
+ if (!this->_opts.extSortAllowed) {
+ // This error message only applies to sorts from user queries made through the find or
+ // aggregation commands. Other clients, such as bulk index builds, should suppress this
+ // error, either by allowing external sorting or by catching and throwing a more
+ // appropriate error.
+ uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
+ str::stream()
+ << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes
+ << " bytes, but did not opt in to external sorting.");
+ }
+
+ sort();
+
+ SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings);
+ for (; !_data.empty(); _data.pop_front()) {
+ writer.addAlreadySorted(_data.front().first, _data.front().second);
+ }
+ Iterator* iteratorPtr = writer.done();
+
+ this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
+
+ _memUsed = 0;
+ }
+
+ const Comparator _comp;
+ const Settings _settings;
+ bool _done = false;
+ size_t _memUsed = 0;
+ std::deque<Data> _data; // Data that has not been spilled.
+};
+
+template <typename Key, typename Value, typename Comparator>
+class LimitOneSorter : public Sorter<Key, Value> {
+ // Since this class is only used for limit==1, it omits all logic to
+ // spill to disk and only tracks memory usage if explicitly requested.
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+
+ LimitOneSorter(const SortOptions& opts, const Comparator& comp)
+ : _comp(comp), _haveData(false) {
+ verify(opts.limit == 1);
+ }
+
+ void add(const Key& key, const Value& val) {
+ Data contender(key, val);
+
+ this->_numSorted += 1;
+ if (_haveData) {
+ dassertCompIsSane(_comp, _best, contender);
+ if (_comp(_best, contender) <= 0)
+ return; // not good enough
+ } else {
+ _haveData = true;
+ }
+
+ _best = {contender.first.getOwned(), contender.second.getOwned()};
+ }
+
+ Iterator* done() {
+ if (_haveData) {
+ if (this->_opts.moveSortedDataIntoIterator) {
+ return new InMemIterator<Key, Value>(std::move(_best));
+ }
+ return new InMemIterator<Key, Value>(_best);
+ } else {
+ return new InMemIterator<Key, Value>();
+ }
+ }
+
+private:
+ void spill() {
+ invariant(false, "LimitOneSorter does not spill to disk");
+ }
+
+ const Comparator _comp;
+ Data _best;
+ bool _haveData; // false at start, set to true on first call to add()
+};
+
+template <typename Key, typename Value, typename Comparator>
+class TopKSorter : public Sorter<Key, Value> {
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings>
+ Settings;
+
+ TopKSorter(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings())
+ : Sorter<Key, Value>(opts),
+ _comp(comp),
+ _settings(settings),
+ _memUsed(0),
+ _haveCutoff(false),
+ _worstCount(0),
+ _medianCount(0) {
+ // This also *works* with limit==1 but LimitOneSorter should be used instead
+ invariant(opts.limit > 1);
+
+ // Preallocate a fixed sized vector of the required size if we don't expect it to have a
+ // major impact on our memory budget. This is the common case with small limits.
+ if (opts.limit <
+ std::min((opts.maxMemoryUsageBytes / 10) / sizeof(typename decltype(_data)::value_type),
+ _data.max_size())) {
+ _data.reserve(opts.limit);
+ }
+ }
+
+ void add(const Key& key, const Value& val) {
+ invariant(!_done);
+
+ this->_numSorted += 1;
+
+ STLComparator less(_comp);
+ Data contender(key, val);
+
+ if (_data.size() < this->_opts.limit) {
+ if (_haveCutoff && !less(contender, _cutoff))
+ return;
+
+ _data.emplace_back(contender.first.getOwned(), contender.second.getOwned());
+
+ auto memUsage = key.memUsageForSorter() + val.memUsageForSorter();
+ _memUsed += memUsage;
+ this->_totalDataSizeSorted += memUsage;
+
+ if (_data.size() == this->_opts.limit)
+ std::make_heap(_data.begin(), _data.end(), less);
+
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
+ spill();
+
+ return;
+ }
+
+ invariant(_data.size() == this->_opts.limit);
+
+ if (!less(contender, _data.front()))
+ return; // not good enough
+
+ // Remove the old worst pair and insert the contender, adjusting _memUsed
+
+ auto memUsage = key.memUsageForSorter() + val.memUsageForSorter();
+ _memUsed += memUsage;
+ this->_totalDataSizeSorted += memUsage;
+
+ _memUsed -= _data.front().first.memUsageForSorter();
+ _memUsed -= _data.front().second.memUsageForSorter();
+
+ std::pop_heap(_data.begin(), _data.end(), less);
+ _data.back() = {contender.first.getOwned(), contender.second.getOwned()};
+ std::push_heap(_data.begin(), _data.end(), less);
+
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
+ spill();
+ }
+
+ Iterator* done() {
+ if (this->_iters.empty()) {
+ sort();
+ if (this->_opts.moveSortedDataIntoIterator) {
+ return new InMemIterator<Key, Value>(std::move(_data));
+ }
+ return new InMemIterator<Key, Value>(_data);
+ }
+
+ spill();
+ Iterator* iterator = Iterator::merge(this->_iters, this->_opts, _comp);
+ _done = true;
+ return iterator;
+ }
+
+private:
+ class STLComparator {
+ public:
+ explicit STLComparator(const Comparator& comp) : _comp(comp) {}
+ bool operator()(const Data& lhs, const Data& rhs) const {
+ dassertCompIsSane(_comp, lhs, rhs);
+ return _comp(lhs, rhs) < 0;
+ }
+
+ private:
+ const Comparator& _comp;
+ };
+
+ void sort() {
+ STLComparator less(_comp);
+
+ if (_data.size() == this->_opts.limit) {
+ std::sort_heap(_data.begin(), _data.end(), less);
+ } else {
+ std::stable_sort(_data.begin(), _data.end(), less);
+ }
+ }
+
+ // Can only be called after _data is sorted
+ void updateCutoff() {
+ // Theory of operation: We want to be able to eagerly ignore values we know will not
+ // be in the TopK result set by setting _cutoff to a value we know we have at least
+ // K values equal to or better than. There are two values that we track to
+ // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When
+ // one of these values becomes the new _cutoff, its associated counter is reset to 0
+ // and a new value is chosen for that member the next time we spill.
+ //
+ // _worstSeen is the worst value we've seen so that all kept values are better than
+ // (or equal to) it. This means that once _worstCount >= _opts.limit there is no
+ // reason to consider values worse than _worstSeen so it can become the new _cutoff.
+ // This technique is especially useful when the input is already roughly sorted (eg
+ // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff
+ // that will exclude most later values, making the full TopK operation including
+ // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time.
+ //
+ // _lastMedian was the median of the _data in the first spill() either overall or
+ // following a promotion of _lastMedian to _cutoff. We count the number of kept
+ // values that are better than or equal to _lastMedian in _medianCount and can
+ // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming
+ // reasonable median selection (which should happen when the data is completely
+ // unsorted), after the first K spilled values, we will keep roughly 50% of the
+ // incoming values, 25% after the second K, 12.5% after the third K, etc. This means
+ // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values,
+ // so the expected number of kept values is O(Log(N/K) * K). The final run time if
+ // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) +
+ // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)).
+ //
+ // This leaves a currently unoptimized worst case of data that is already roughly
+ // sorted, but in the wrong direction, such that the desired results are all the
+ // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this
+ // should be trivially detectable, as a future optimization it might be nice to
+ // detect this case and reverse the direction of input (if possible) which would
+ // turn this into the best case described above.
+ //
+ // Pedantic notes: The time complexities above (which count number of comparisons)
+ // ignore the sorting of batches prior to spilling to disk since they make it more
+ // confusing without changing the results. If you want to add them back in, add an
+ // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also,
+ // all space complexities measure disk space rather than memory since this class is
+ // O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
+
+ STLComparator less(_comp); // less is "better" for TopK.
+
+ // Pick a new _worstSeen or _lastMedian if should.
+ if (_worstCount == 0 || less(_worstSeen, _data.back())) {
+ _worstSeen = _data.back();
+ }
+ if (_medianCount == 0) {
+ size_t medianIndex = _data.size() / 2; // chooses the higher if size() is even.
+ _lastMedian = _data[medianIndex];
+ }
+
+ // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian.
+ _worstCount += _data.size(); // everything is better or equal
+ typename std::vector<Data>::iterator firstWorseThanLastMedian =
+ std::upper_bound(_data.begin(), _data.end(), _lastMedian, less);
+ _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian);
+
+
+ // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
+ if (_worstCount >= this->_opts.limit) {
+ if (!_haveCutoff || less(_worstSeen, _cutoff)) {
+ _cutoff = _worstSeen;
+ _haveCutoff = true;
+ }
+ _worstCount = 0;
+ }
+ if (_medianCount >= this->_opts.limit) {
+ if (!_haveCutoff || less(_lastMedian, _cutoff)) {
+ _cutoff = _lastMedian;
+ _haveCutoff = true;
+ }
+ _medianCount = 0;
+ }
+ }
+
+ void spill() {
+ invariant(!_done);
+
+ if (_data.empty())
+ return;
+
+ if (!this->_opts.extSortAllowed) {
+ // This error message only applies to sorts from user queries made through the find or
+ // aggregation commands. Other clients should suppress this error, either by allowing
+ // external sorting or by catching and throwing a more appropriate error.
+ uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
+ str::stream()
+ << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes
+ << " bytes, but did not opt in to external sorting. Aborting operation."
+ << " Pass allowDiskUse:true to opt in.");
+ }
+
+ // We should check readOnly before getting here.
+ invariant(!storageGlobalParams.readOnly);
+
+ sort();
+ updateCutoff();
+
+ SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings);
+ for (size_t i = 0; i < _data.size(); i++) {
+ writer.addAlreadySorted(_data[i].first, _data[i].second);
+ }
+
+ // clear _data and release backing array's memory
+ std::vector<Data>().swap(_data);
+
+ Iterator* iteratorPtr = writer.done();
+ this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr));
+
+ _memUsed = 0;
+ }
+
+ const Comparator _comp;
+ const Settings _settings;
+ bool _done = false;
+ size_t _memUsed;
+
+ // Data that has not been spilled. Organized as max-heap if size == limit.
+ std::vector<Data> _data;
+
+ // See updateCutoff() for a full description of how these members are used.
+ bool _haveCutoff;
+ Data _cutoff; // We can definitely ignore values worse than this.
+ Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit.
+ size_t _worstCount; // Number of docs better or equal to _worstSeen kept so far.
+ Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit.
+ size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far.
+};
+
+} // namespace sorter
+
+template <typename Key, typename Value>
+Sorter<Key, Value>::Sorter(const SortOptions& opts)
+ : _opts(opts),
+ _file(opts.extSortAllowed
+ ? std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + nextFileName())
+ : nullptr) {}
+
+template <typename Key, typename Value>
+Sorter<Key, Value>::Sorter(const SortOptions& opts, const std::string& fileName)
+ : _opts(opts),
+ _file(std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + fileName)) {
+ invariant(opts.extSortAllowed);
+ invariant(!opts.tempDir.empty());
+ invariant(!fileName.empty());
+}
+
+template <typename Key, typename Value>
+typename Sorter<Key, Value>::PersistedState Sorter<Key, Value>::persistDataForShutdown() {
+ spill();
+ this->_file->keep();
+
+ std::vector<SorterRange> ranges;
+ ranges.reserve(_iters.size());
+ std::transform(_iters.begin(), _iters.end(), std::back_inserter(ranges), [](const auto it) {
+ return it->getRange();
+ });
+
+ return {_file->path().filename().string(), ranges};
+}
+
+template <typename Key, typename Value>
+Sorter<Key, Value>::File::~File() {
+ if (_keep) {
+ return;
+ }
+
+ if (_file.is_open()) {
+ DESTRUCTOR_GUARD(_file.exceptions(std::ios::failbit));
+ DESTRUCTOR_GUARD(_file.close());
+ }
+
+ DESTRUCTOR_GUARD(boost::filesystem::remove(_path));
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::read(std::streamoff offset, std::streamsize size, void* out) {
+ if (!_file.is_open()) {
+ _open();
+ }
+
+ if (_offset != -1) {
+ _file.exceptions(std::ios::goodbit);
+ _file.flush();
+ _offset = -1;
+
+ uassert(5479100,
+ str::stream() << "Error flushing file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file);
+ }
+
+ _file.seekg(offset);
+ _file.read(reinterpret_cast<char*>(out), size);
+
+ uassert(16817,
+ str::stream() << "Error reading file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file);
+
+ invariant(_file.gcount() == size,
+ str::stream() << "Number of bytes read (" << _file.gcount()
+ << ") not equal to expected number (" << size << ")");
+
+ uassert(51049,
+ str::stream() << "Error reading file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file.tellg() >= 0);
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::write(const char* data, std::streamsize size) {
+ _ensureOpenForWriting();
+
+ try {
+ _file.write(data, size);
+ _offset += size;
+ } catch (const std::system_error& ex) {
+ if (ex.code() == std::errc::no_space_on_device) {
+ uasserted(ErrorCodes::OutOfDiskSpace,
+ str::stream() << ex.what() << ": " << _path.string());
+ }
+ uasserted(5642403,
+ str::stream() << "Error writing to file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription());
+ } catch (const std::exception&) {
+ uasserted(16821,
+ str::stream() << "Error writing to file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription());
+ }
+}
+
+template <typename Key, typename Value>
+std::streamoff Sorter<Key, Value>::File::currentOffset() {
+ _ensureOpenForWriting();
+ return _offset;
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::_open() {
+ invariant(!_file.is_open());
+
+ boost::filesystem::create_directories(_path.parent_path());
+
+ // We open the provided file in append mode so that SortedFileWriter instances can share
+ // the same file, used serially. We want to share files in order to stay below system
+ // open file limits.
+ _file.open(_path.string(), std::ios::app | std::ios::binary | std::ios::in | std::ios::out);
+
+ uassert(16818,
+ str::stream() << "Error opening file " << _path.string() << ": "
+ << sorter::myErrnoWithDescription(),
+ _file.good());
+}
+
+template <typename Key, typename Value>
+void Sorter<Key, Value>::File::_ensureOpenForWriting() {
+ invariant(_offset != -1 || !_file.is_open());
+
+ if (_file.is_open()) {
+ return;
+ }
+
+ _open();
+ _file.exceptions(std::ios::failbit | std::ios::badbit);
+ _offset = boost::filesystem::file_size(_path);
+}
+
+//
+// SortedFileWriter
+//
+
+template <typename Key, typename Value>
+SortedFileWriter<Key, Value>::SortedFileWriter(
+ const SortOptions& opts,
+ std::shared_ptr<typename Sorter<Key, Value>::File> file,
+ const Settings& settings)
+ : _settings(settings),
+ _file(std::move(file)),
+ _fileStartOffset(_file->currentOffset()),
+ _dbName(opts.dbName) {
+ // This should be checked by consumers, but if we get here don't allow writes.
+ uassert(
+ 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos());
+
+ uassert(17148,
+ "Attempting to use external sort without setting SortOptions::tempDir",
+ !opts.tempDir.empty());
+}
+
+template <typename Key, typename Value>
+void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) {
+
+ // Offset that points to the place in the buffer where a new data object will be stored.
+ int _nextObjPos = _buffer.len();
+
+ // Add serialized key and value to the buffer.
+ key.serializeForSorter(_buffer);
+ val.serializeForSorter(_buffer);
+
+ // Serializing the key and value grows the buffer, but _buffer.buf() still points to the
+ // beginning. Use _buffer.len() to determine portion of buffer containing new datum.
+ _checksum =
+ addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum);
+
+ if (_buffer.len() > 64 * 1024)
+ spill();
+}
+
+template <typename Key, typename Value>
+void SortedFileWriter<Key, Value>::spill() {
+ int32_t size = _buffer.len();
+ char* outBuffer = _buffer.buf();
+
+ if (size == 0)
+ return;
+
+ std::string compressed;
+ snappy::Compress(outBuffer, size, &compressed);
+ verify(compressed.size() <= size_t(std::numeric_limits<int32_t>::max()));
+
+ const bool shouldCompress = compressed.size() < size_t(_buffer.len() / 10 * 9);
+ if (shouldCompress) {
+ size = compressed.size();
+ outBuffer = const_cast<char*>(compressed.data());
+ }
+
+ std::unique_ptr<char[]> out;
+ if (auto encryptionHooks = getEncryptionHooksIfEnabled()) {
+ size_t protectedSizeMax = size + encryptionHooks->additionalBytesForProtectedBuffer();
+ out.reset(new char[protectedSizeMax]);
+ size_t resultLen;
+ Status status = encryptionHooks->protectTmpData(reinterpret_cast<const uint8_t*>(outBuffer),
+ size,
+ reinterpret_cast<uint8_t*>(out.get()),
+ protectedSizeMax,
+ &resultLen,
+ _dbName);
+ uassert(28842,
+ str::stream() << "Failed to compress data: " << status.toString(),
+ status.isOK());
+ outBuffer = out.get();
+ size = resultLen;
+ }
+
+ // Negative size means compressed.
+ size = shouldCompress ? -size : size;
+ _file->write(reinterpret_cast<const char*>(&size), sizeof(size));
+ _file->write(outBuffer, std::abs(size));
+
+ _buffer.reset();
+}
+
+template <typename Key, typename Value>
+SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() {
+ spill();
+
+ return new sorter::FileIterator<Key, Value>(
+ _file, _fileStartOffset, _file->currentOffset(), _settings, _dbName, _checksum);
+}
+
+//
+// Factory Functions
+//
+
+template <typename Key, typename Value>
+template <typename Comparator>
+SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge(
+ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const SortOptions& opts,
+ const Comparator& comp) {
+ return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp);
+}
+
+template <typename Key, typename Value>
+template <typename Comparator>
+Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings) {
+ checkNoExternalSortOnMongos(opts);
+
+ uassert(17149,
+ "Attempting to use external sort without setting SortOptions::tempDir",
+ !(opts.extSortAllowed && opts.tempDir.empty()));
+ switch (opts.limit) {
+ case 0:
+ return new sorter::NoLimitSorter<Key, Value, Comparator>(opts, comp, settings);
+ case 1:
+ return new sorter::LimitOneSorter<Key, Value, Comparator>(opts, comp);
+ default:
+ return new sorter::TopKSorter<Key, Value, Comparator>(opts, comp, settings);
+ }
+}
+
+template <typename Key, typename Value>
+template <typename Comparator>
+Sorter<Key, Value>* Sorter<Key, Value>::makeFromExistingRanges(
+ const std::string& fileName,
+ const std::vector<SorterRange>& ranges,
+ const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings) {
+ checkNoExternalSortOnMongos(opts);
+
+ invariant(opts.limit == 0,
+ str::stream() << "Creating a Sorter from existing ranges is only available with the "
+ "NoLimitSorter (limit 0), but got limit "
+ << opts.limit);
+
+ return new sorter::NoLimitSorter<Key, Value, Comparator>(
+ fileName, ranges, opts, comp, settings);
+}
+} // namespace mongo
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index f714e62b7fe..357643b09b8 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -29,15 +29,25 @@
#pragma once
-#include "mongo/db/sorter/sorted_data_iterator.h"
+#include <third_party/murmurhash3/MurmurHash3.h>
+
+#include <boost/filesystem/path.hpp>
+#include <deque>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "mongo/bson/util/builder.h"
#include "mongo/db/sorter/sorter_gen.h"
+#include "mongo/util/bufreader.h"
-namespace mongo::sorter {
/**
- * The in-memory and external Sorter.
+ * This is the public API for the Sorter (both in-memory and external)
*
- * The Sorter is templated on Key and Value types, each of which require the following public
- * members:
+ * Many of the classes in this file are templated on Key and Value types which
+ * require the following public members:
*
* // A type carrying extra information used by the deserializer. Contents are
* // up to you, but it should be cheap to copy. Use an empty struct if your
@@ -58,62 +68,276 @@ namespace mongo::sorter {
* // Return *this if your type doesn't have an unowned state.
* Type getOwned() const;
*
- * CompFn is a function that compares std::pair<Key, Value> and returns an int less than, equal to,
- * or greater than 0 depending on how the two pairs compare with the same semantics as memcmp.
+ * Comparators are functors that that compare std::pair<Key, Value> and return an
+ * int less than, equal to, or greater than 0 depending on how the two pairs
+ * compare with the same semantics as memcmp.
+ * Example for Key=BSONObj, Value=int:
+ *
+ * class MyComparator {
+ * public:
+ * int operator()(const std::pair<BSONObj, int>& lhs,
+ * const std::pair<BSONObj, int>& rhs) {
+ * int ret = lhs.first.woCompare(rhs.first, _ord);
+ * if (ret)
+ * return ret;
+ *
+ * if (lhs.second > rhs.second) return 1;
+ * if (lhs.second == rhs.second) return 0;
+ * return -1;
+ * }
+ * Ordering _ord;
+ * };
+ */
+
+namespace mongo {
+
+/**
+ * Runtime options that control the Sorter's behavior
+ */
+struct SortOptions {
+ // The number of KV pairs to be returned. 0 indicates no limit.
+ unsigned long long limit;
+
+ // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate.
+ size_t maxMemoryUsageBytes;
+
+ // Whether we are allowed to spill to disk. If this is false and in-memory exceeds
+ // maxMemoryUsageBytes, we will uassert.
+ bool extSortAllowed;
+
+ // In case the sorter spills encrypted data to disk that must be readable even after process
+ // restarts, it must encrypt with a persistent key. This key is accessed using the database
+ // name that the sorted collection lives in. If encryption is enabled and dbName is boost::none,
+ // a temporary key is used.
+ boost::optional<std::string> dbName;
+
+ // Directory into which we place a file when spilling to disk. Must be explicitly set if
+ // extSortAllowed is true.
+ std::string tempDir;
+
+ // If set to true and sorted data fits into memory, sorted data will be moved into iterator
+ // instead of copying.
+ bool moveSortedDataIntoIterator;
+
+ SortOptions()
+ : limit(0),
+ maxMemoryUsageBytes(64 * 1024 * 1024),
+ extSortAllowed(false),
+ moveSortedDataIntoIterator(false) {}
+
+ // Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true)
+
+ SortOptions& Limit(unsigned long long newLimit) {
+ limit = newLimit;
+ return *this;
+ }
+
+ SortOptions& MaxMemoryUsageBytes(size_t newMaxMemoryUsageBytes) {
+ maxMemoryUsageBytes = newMaxMemoryUsageBytes;
+ return *this;
+ }
+
+ SortOptions& ExtSortAllowed(bool newExtSortAllowed = true) {
+ extSortAllowed = newExtSortAllowed;
+ return *this;
+ }
+
+ SortOptions& TempDir(const std::string& newTempDir) {
+ tempDir = newTempDir;
+ return *this;
+ }
+
+ SortOptions& DBName(std::string newDbName) {
+ dbName = std::move(newDbName);
+ return *this;
+ }
+
+ SortOptions& MoveSortedDataIntoIterator(bool newMoveSortedDataIntoIterator = true) {
+ moveSortedDataIntoIterator = newMoveSortedDataIntoIterator;
+ return *this;
+ }
+};
+
+/**
+ * This is a 0-sized dummy object that satisfies Sorter's Key/Value interface.
+ */
+class NullValue {
+public:
+ struct SorterDeserializeSettings {}; // unused
+ void serializeForSorter(BufBuilder& buf) const {
+ return;
+ }
+ static NullValue deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
+ return {};
+ }
+ int memUsageForSorter() const {
+ return 0;
+ }
+ NullValue getOwned() const {
+ return {};
+ }
+};
+
+/**
+ * This is the sorted output iterator from the sorting framework.
+ */
+template <typename Key, typename Value>
+class SortIteratorInterface {
+ SortIteratorInterface(const SortIteratorInterface&) = delete;
+ SortIteratorInterface& operator=(const SortIteratorInterface&) = delete;
+
+public:
+ typedef std::pair<Key, Value> Data;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings>
+ Settings;
+
+ // Unowned objects are only valid until next call to any method
+
+ virtual bool more() = 0;
+ virtual std::pair<Key, Value> next() = 0;
+
+ virtual ~SortIteratorInterface() {}
+
+ // Returns an iterator that merges the passed in iterators
+ template <typename Comparator>
+ static SortIteratorInterface* merge(
+ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters,
+ const SortOptions& opts,
+ const Comparator& comp);
+
+ // Opens and closes the source of data over which this class iterates, if applicable.
+ virtual void openSource() = 0;
+ virtual void closeSource() = 0;
+
+ virtual SorterRange getRange() const {
+ invariant(false, "Only FileIterator has ranges");
+ MONGO_UNREACHABLE;
+ }
+
+protected:
+ SortIteratorInterface() {} // can only be constructed as a base
+};
+
+/**
+ * This is the way to input data to the sorting framework.
+ *
+ * Each instance of this class will generate a file name and spill sorted data ranges to that file
+ * if allowed in its given Settings. If the instance destructs before done() is called, it will
+ * handle deleting the data file used for spills. Otherwise, if done() is called, responsibility for
+ * file deletion moves to the returned Iterator object, which must then delete the file upon its own
+ * destruction.
+ *
+ * All users of Sorter implementations must define their own nextFileName() function to generate
+ * unique file names for spills to disk. This is necessary because the sorter.cpp file is separately
+ * directly included in multiple places, rather than compiled in one place and linked, and so cannot
+ * itself provide a globally unique ID for file names. See existing function implementations of
+ * nextFileName() for example.
*/
template <typename Key, typename Value>
class Sorter {
+ Sorter(const Sorter&) = delete;
+ Sorter& operator=(const Sorter&) = delete;
+
public:
- using Data = std::pair<Key, Value>;
- using Iterator = SortedDataIterator<Key, Value>;
- using CompFn = std::function<int(const Data&, const Data&)>;
- using Settings = std::pair<typename Key::SorterDeserializeSettings,
- typename Value::SorterDeserializeSettings>;
+ typedef std::pair<Key, Value> Data;
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings>
+ Settings;
struct PersistedState {
std::string fileName;
std::vector<SorterRange> ranges;
};
- explicit Sorter(const CompFn& comp) : _comp(comp) {}
+ /**
+ * Represents the file that a Sorter uses to spill to disk. Supports reading after writing (or
+ * reading without any writing), but does not support writing after any reading has been done.
+ */
+ class File {
+ public:
+ File(std::string path) : _path(std::move(path)) {
+ invariant(!_path.empty());
+ }
- Sorter(const Sorter&) = delete;
- Sorter& operator=(const Sorter&) = delete;
+ ~File();
- virtual ~Sorter() {}
+ const boost::filesystem::path& path() const {
+ return _path;
+ }
- /**
- * Adds the key/value. The Sorter may make its own owned copy of the data.
- */
- virtual void add(const Key& key, const Value& value) = 0;
+ /**
+ * Signals that the on-disk file should not be cleaned up.
+ */
+ void keep() {
+ _keep = true;
+ };
+
+ /**
+ * Reads the requested data from the file. Cannot write more to the file once this has been
+ * called.
+ */
+ void read(std::streamoff offset, std::streamsize size, void* out);
+
+ /**
+ * Writes the given data to the end of the file. Cannot be called after reading.
+ */
+ void write(const char* data, std::streamsize size);
+
+ /**
+ * Returns the current offset of the end of the file. Cannot be called after reading.
+ */
+ std::streamoff currentOffset();
+
+ private:
+ void _open();
+
+ void _ensureOpenForWriting();
+
+ boost::filesystem::path _path;
+ std::fstream _file;
+
+ // The current offset of the end of the file, or -1 if the file either has not yet been
+ // opened or is already being read.
+ std::streamoff _offset = -1;
+
+ // Whether to keep the on-disk file even after this in-memory object has been destructed.
+ bool _keep = false;
+ };
+
+ explicit Sorter(const SortOptions& opts);
/**
- * Adds the key/value which aleady own their underlying data. The Sorter may (or may not) take
- * advantage of this by not making a new copy of the data.
+ * ExtSort-only constructor. fileName is the base name of a file in the temp directory.
*/
- virtual void addOwned(Key&& key, Value&& value) {
- add(key, value);
- }
+ Sorter(const SortOptions& opts, const std::string& fileName);
+
+ template <typename Comparator>
+ static Sorter* make(const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings());
+ template <typename Comparator>
+ static Sorter* makeFromExistingRanges(const std::string& fileName,
+ const std::vector<SorterRange>& ranges,
+ const SortOptions& opts,
+ const Comparator& comp,
+ const Settings& settings = Settings());
+
+ virtual void add(const Key&, const Value&) = 0;
+ virtual void emplace(Key&& k, Value&& v) {
+ add(k, v);
+ }
/**
- * Returns an Iterator to iterate through the sorted data. The Sorter must outlive the returned
- * Iterator.
- *
- * If the Sorter does not spill to disk, the specified return policy determines whether the
- * Iterator returns data from the Sorter via move or via copy. Returning data via copy is useful
- * if you may later need the data to still be intact in order to be spilled.
- *
- * If the sorter does spill to disk, the specified return policy is ignored.
+ * Cannot add more data after calling done().
*/
- virtual std::unique_ptr<Iterator> done(
- typename Iterator::ReturnPolicy returnPolicy = Iterator::ReturnPolicy::kMove) = 0;
+ virtual Iterator* done() = 0;
- virtual PersistedState persistDataForShutdown() {
- MONGO_UNREACHABLE;
- }
+ virtual ~Sorter() {}
- virtual size_t numSpills() const {
- return 0;
+ size_t numSpills() const {
+ return _iters.size();
}
size_t numSorted() const {
@@ -124,12 +348,98 @@ public:
return _totalDataSizeSorted;
}
+ PersistedState persistDataForShutdown();
+
protected:
- const CompFn _comp;
+ Sorter() {} // can only be constructed as a base
+
+ virtual void spill() = 0;
+
+ size_t _numSorted = 0; // Keeps track of the number of keys sorted.
+ uint64_t _totalDataSizeSorted = 0; // Keeps track of the total size of data sorted.
+
+ SortOptions _opts;
+
+ std::shared_ptr<File> _file;
+
+ std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled.
+};
+
+/**
+ * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file
+ * range.
+ */
+template <typename Key, typename Value>
+class SortedFileWriter {
+ SortedFileWriter(const SortedFileWriter&) = delete;
+ SortedFileWriter& operator=(const SortedFileWriter&) = delete;
- size_t _numSorted = 0;
- uint64_t _totalDataSizeSorted = 0;
+public:
+ typedef SortIteratorInterface<Key, Value> Iterator;
+ typedef std::pair<typename Key::SorterDeserializeSettings,
+ typename Value::SorterDeserializeSettings>
+ Settings;
+
+ explicit SortedFileWriter(const SortOptions& opts,
+ std::shared_ptr<typename Sorter<Key, Value>::File> file,
+ const Settings& settings = Settings());
+
+ void addAlreadySorted(const Key&, const Value&);
+
+ /**
+ * Spills any data remaining in the buffer to disk and then closes the file to which data was
+ * written.
+ *
+ * No more data can be added via addAlreadySorted() after calling done().
+ */
+ Iterator* done();
- bool _done = false;
+private:
+ void spill();
+
+ const Settings _settings;
+ std::shared_ptr<typename Sorter<Key, Value>::File> _file;
+ BufBuilder _buffer;
+
+ // Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator
+ // to ensure data has not been corrupted after reading from disk.
+ uint32_t _checksum = 0;
+
+ // Tracks where in the file we started writing the sorted data range so that the information can
+ // be given to the Iterator in done().
+ std::streamoff _fileStartOffset;
+
+ boost::optional<std::string> _dbName;
};
-} // namespace mongo::sorter
+} // namespace mongo
+
+/**
+ * #include "mongo/db/sorter/sorter.cpp" and call this in a single translation
+ * unit once for each unique set of template parameters.
+ */
+#define MONGO_CREATE_SORTER(Key, Value, Comparator) \
+ /* public classes */ \
+ template class ::mongo::Sorter<Key, Value>; \
+ template class ::mongo::SortIteratorInterface<Key, Value>; \
+ template class ::mongo::SortedFileWriter<Key, Value>; \
+ /* internal classes */ \
+ template class ::mongo::sorter::NoLimitSorter<Key, Value, Comparator>; \
+ template class ::mongo::sorter::LimitOneSorter<Key, Value, Comparator>; \
+ template class ::mongo::sorter::TopKSorter<Key, Value, Comparator>; \
+ template class ::mongo::sorter::MergeIterator<Key, Value, Comparator>; \
+ template class ::mongo::sorter::InMemIterator<Key, Value>; \
+ template class ::mongo::sorter::FileIterator<Key, Value>; \
+ /* factory functions */ \
+ template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \
+ SortIteratorInterface<Key, Value>::merge<Comparator>( \
+ const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \
+ const SortOptions& opts, \
+ const Comparator& comp); \
+ template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \
+ const SortOptions& opts, const Comparator& comp, const Settings& settings); \
+ template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::makeFromExistingRanges< \
+ Comparator>(const std::string& fileName, \
+ const std::vector<SorterRange>& ranges, \
+ const SortOptions& opts, \
+ const Comparator& comp, \
+ const Settings& settings);
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index f9b887515a1..4607fef685c 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -29,6 +29,8 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+#include "mongo/platform/basic.h"
+
#include <boost/filesystem.hpp>
#include <fstream>
#include <memory>
@@ -36,10 +38,6 @@
#include "mongo/base/data_type_endian.h"
#include "mongo/base/static_assert.h"
#include "mongo/config.h"
-#include "mongo/db/sorter/factory.h"
-#include "mongo/db/sorter/in_mem_iterator.h"
-#include "mongo/db/sorter/single_elem_iterator.h"
-#include "mongo/db/sorter/sorted_file_writer.h"
#include "mongo/db/sorter/sorter.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/random.h"
@@ -48,9 +46,36 @@
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
-namespace mongo::sorter {
+
+namespace mongo {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicWord<unsigned> sorterTestFileCounter;
+ return "extsort-sorter-test." + std::to_string(sorterTestFileCounter.fetchAndAdd(1));
+}
+
+} // namespace mongo
+
+// Need access to internal classes
+#include "mongo/db/sorter/sorter.cpp"
+
+namespace mongo {
+namespace sorter {
namespace {
+//
+// Sorter framework testing utilities
+//
+
class IntWrapper {
public:
IntWrapper(int i = 0) : _i(i) {}
@@ -78,7 +103,7 @@ private:
};
typedef std::pair<IntWrapper, IntWrapper> IWPair;
-typedef SortedDataIterator<IntWrapper, IntWrapper> IWIterator;
+typedef SortIteratorInterface<IntWrapper, IntWrapper> IWIterator;
typedef Sorter<IntWrapper, IntWrapper> IWSorter;
enum Direction { ASC = 1, DESC = -1 };
@@ -103,7 +128,7 @@ public:
: _current(start), _increment(increment), _stop(stop) {}
void openSource() {}
void closeSource() {}
- bool more() const {
+ bool more() {
if (_increment == 0)
return true;
if (_increment > 0)
@@ -126,7 +151,7 @@ class EmptyIterator : public IWIterator {
public:
void openSource() {}
void closeSource() {}
- bool more() const {
+ bool more() {
return false;
}
Data next() {
@@ -136,15 +161,15 @@ public:
class LimitIterator : public IWIterator {
public:
- LimitIterator(long long limit, std::unique_ptr<IWIterator> source)
- : _remaining(limit), _source(std::move(source)) {
+ LimitIterator(long long limit, std::shared_ptr<IWIterator> source)
+ : _remaining(limit), _source(source) {
verify(limit > 0);
}
void openSource() {}
void closeSource() {}
- bool more() const {
+ bool more() {
return _remaining && _source->more();
}
Data next() {
@@ -155,13 +180,15 @@ public:
private:
long long _remaining;
- std::unique_ptr<IWIterator> _source;
+ std::shared_ptr<IWIterator> _source;
};
template <typename It1, typename It2>
void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) {
int iteration;
try {
+ it1->openSource();
+ it2->openSource();
for (iteration = 0; true; iteration++) {
ASSERT_EQUALS(it1->more(), it2->more());
ASSERT_EQUALS(it1->more(), it2->more()); // make sure more() is safe to call twice
@@ -173,68 +200,67 @@ void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) {
ASSERT_EQUALS(pair1.first, pair2.first);
ASSERT_EQUALS(pair1.second, pair2.second);
}
+ it1->closeSource();
+ it2->closeSource();
} catch (...) {
- LOGV2(22047, "Failure", "line"_attr = line, "iteration"_attr = iteration);
+ LOGV2(22047,
+ "Failure from line {line} on iteration {iteration}",
+ "line"_attr = line,
+ "iteration"_attr = iteration);
+ it1->closeSource();
+ it2->closeSource();
throw;
}
}
#define ASSERT_ITERATORS_EQUIVALENT(it1, it2) _assertIteratorsEquivalent(it1, it2, __LINE__)
-std::vector<IWPair> makeDataForInMemIterator(const std::vector<int>& ints) {
- std::vector<IWPair> data;
- for (auto i : ints) {
- data.emplace_back(i, -i);
- }
- return data;
-}
-
-std::unique_ptr<IWIterator> makeInMemIterator(std::vector<IWPair>& data) {
- return std::make_unique<InMemIterator<IntWrapper, IntWrapper>>(data,
- IWIterator::ReturnPolicy::kMove);
+template <int N>
+std::shared_ptr<IWIterator> makeInMemIterator(const int (&array)[N]) {
+ std::vector<IWPair> vec;
+ for (int i = 0; i < N; i++)
+ vec.push_back(IWPair(array[i], -array[i]));
+ return std::make_shared<sorter::InMemIterator<IntWrapper, IntWrapper>>(vec);
}
-std::unique_ptr<IWIterator> mergeIterators(
- const std::vector<std::unique_ptr<IWIterator>>& iterators,
- Direction Dir = ASC,
- const Options& opts = Options()) {
- invariant(!opts.tempDir);
- return std::make_unique<MergeIterator<IntWrapper, IntWrapper>>(
- iterators, opts.limit, IWComparator(Dir));
+template <typename IteratorPtr, int N>
+std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N],
+ Direction Dir = ASC,
+ const SortOptions& opts = SortOptions()) {
+ invariant(!opts.extSortAllowed);
+ std::vector<std::shared_ptr<IWIterator>> vec;
+ for (int i = 0; i < N; i++)
+ vec.push_back(std::shared_ptr<IWIterator>(array[i]));
+ return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir)));
}
//
// Tests for Sorter framework internals
//
-class SingleElemIterTests {
+class InMemIterTests {
+public:
void run() {
{
EmptyIterator empty;
- SingleElemIterator<IntWrapper, IntWrapper> singleElem{IWIterator::ReturnPolicy::kMove};
- ASSERT_ITERATORS_EQUIVALENT(&singleElem, &empty);
+ sorter::InMemIterator<IntWrapper, IntWrapper> inMem;
+ ASSERT_ITERATORS_EQUIVALENT(&inMem, &empty);
}
- }
-};
-
-class InMemIterTests {
-public:
- void run() {
{
- auto data = makeDataForInMemIterator(
- {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
- ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(data),
- std::make_unique<IntIterator>(0, 20));
+ static const int zeroUpTo20[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+ 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
+ ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(zeroUpTo20),
+ std::make_shared<IntIterator>(0, 20));
}
{
- // Make sure InMemIterator doesn't do any reordering on it's own.
- static std::vector<int> unsorted{6, 3, 7, 4, 0, 9, 5, 7, 1, 8};
+ // make sure InMemIterator doesn't do any reordering on it's own
+ static const int unsorted[] = {6, 3, 7, 4, 0, 9, 5, 7, 1, 8};
class UnsortedIter : public IWIterator {
public:
UnsortedIter() : _pos(0) {}
void openSource() {}
void closeSource() {}
- bool more() const {
- return _pos < unsorted.size();
+ bool more() {
+ return _pos < sizeof(unsorted) / sizeof(unsorted[0]);
}
IWPair next() {
IWPair ret(unsorted[_pos], -unsorted[_pos]);
@@ -244,8 +270,7 @@ public:
size_t _pos;
} unsortedIter;
- auto data = makeDataForInMemIterator(unsorted);
- ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(data),
+ ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(unsorted),
static_cast<IWIterator*>(&unsortedIter));
}
}
@@ -255,27 +280,29 @@ class SortedFileWriterAndFileIteratorTests {
public:
void run() {
unittest::TempDir tempDir("sortedFileWriterTests");
- Options opts;
- opts.tempDir = tempDir.path();
+ const SortOptions opts = SortOptions().TempDir(tempDir.path());
+ auto makeFile = [&] {
+ return std::make_shared<Sorter<IntWrapper, IntWrapper>::File>(opts.tempDir + "/" +
+ nextFileName());
+ };
{ // small
- auto file = std::make_unique<File>(*opts.tempDir + "/" + nextFileName("sorter-test"));
- SortedFileWriter<IntWrapper, IntWrapper> sorter(file.get());
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile());
sorter.addAlreadySorted(0, 0);
sorter.addAlreadySorted(1, -1);
sorter.addAlreadySorted(2, -2);
sorter.addAlreadySorted(3, -3);
sorter.addAlreadySorted(4, -4);
- ASSERT_ITERATORS_EQUIVALENT(sorter.done(), std::make_unique<IntIterator>(0, 5));
+ ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
+ std::make_shared<IntIterator>(0, 5));
}
{ // big
- auto file = std::make_unique<File>(*opts.tempDir + "/" + nextFileName("sorter-test"));
- SortedFileWriter<IntWrapper, IntWrapper> sorter(file.get());
+ SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile());
for (int i = 0; i < 10 * 1000 * 1000; i++)
sorter.addAlreadySorted(i, -i);
- ASSERT_ITERATORS_EQUIVALENT(sorter.done(),
- std::make_unique<IntIterator>(0, 10 * 1000 * 1000));
+ ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()),
+ std::make_shared<IntIterator>(0, 10 * 1000 * 1000));
}
ASSERT(boost::filesystem::is_empty(tempDir.path()));
@@ -287,48 +314,49 @@ class MergeIteratorTests {
public:
void run() {
{ // test empty (no inputs)
- ASSERT_ITERATORS_EQUIVALENT(mergeIterators({}, ASC), std::make_unique<EmptyIterator>());
+ std::vector<std::shared_ptr<IWIterator>> vec;
+ std::shared_ptr<IWIterator> mergeIter(
+ IWIterator::merge(vec, SortOptions(), IWComparator()));
+ ASSERT_ITERATORS_EQUIVALENT(mergeIter, std::make_shared<EmptyIterator>());
}
{ // test empty (only empty inputs)
- std::vector<std::unique_ptr<IWIterator>> iterators;
- iterators.push_back(std::make_unique<EmptyIterator>());
- iterators.push_back(std::make_unique<EmptyIterator>());
- iterators.push_back(std::make_unique<EmptyIterator>());
+ std::shared_ptr<IWIterator> iterators[] = {std::make_shared<EmptyIterator>(),
+ std::make_shared<EmptyIterator>(),
+ std::make_shared<EmptyIterator>()};
ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iterators, ASC),
- std::make_unique<EmptyIterator>());
+ std::make_shared<EmptyIterator>());
}
{ // test ASC
- std::vector<std::unique_ptr<IWIterator>> iterators;
- iterators.push_back(std::make_unique<IntIterator>(1, 20, 2)); // 1, 3, ... 19
- iterators.push_back(std::make_unique<IntIterator>(0, 20, 2)); // 0, 2, ... 18
+ std::shared_ptr<IWIterator> iterators[] = {
+ std::make_shared<IntIterator>(1, 20, 2) // 1, 3, ... 19
+ ,
+ std::make_shared<IntIterator>(0, 20, 2) // 0, 2, ... 18
+ };
ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iterators, ASC),
- std::make_unique<IntIterator>(0, 20, 1));
+ std::make_shared<IntIterator>(0, 20, 1));
}
{ // test DESC with an empty source
- std::vector<std::unique_ptr<IWIterator>> iterators;
- iterators.push_back(std::make_unique<IntIterator>(30, 0, -3)); // 30, 27, ... 3
- iterators.push_back(std::make_unique<IntIterator>(29, 0, -3)); // 29, 26, ... 2
- iterators.push_back(std::make_unique<IntIterator>(28, 0, -3)); // 28, 25, ... 1
- iterators.push_back(std::make_unique<EmptyIterator>()); // 28, 25, ... 1
+ std::shared_ptr<IWIterator> iterators[] = {
+ std::make_shared<IntIterator>(30, 0, -3), // 30, 27, ... 3
+ std::make_shared<IntIterator>(29, 0, -3), // 29, 26, ... 2
+ std::make_shared<IntIterator>(28, 0, -3), // 28, 25, ... 1
+ std::make_shared<EmptyIterator>()};
ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iterators, DESC),
- std::make_unique<IntIterator>(30, 0, -1));
+ std::make_shared<IntIterator>(30, 0, -1));
}
{ // test Limit
- std::vector<std::unique_ptr<IWIterator>> iterators;
- iterators.push_back(std::make_unique<IntIterator>(1, 20, 2)); // 1, 3, ... 19
- iterators.push_back(std::make_unique<IntIterator>(0, 20, 2)); // 0, 2, ... 18
-
- Options opts;
- opts.limit = 10;
+ std::shared_ptr<IWIterator> iterators[] = {
+ std::make_shared<IntIterator>(1, 20, 2), // 1, 3, ... 19
+ std::make_shared<IntIterator>(0, 20, 2)}; // 0, 2, ... 18
ASSERT_ITERATORS_EQUIVALENT(
- mergeIterators(iterators, ASC, opts),
- std::make_unique<LimitIterator>(10, std::make_unique<IntIterator>(0, 20, 1)));
+ mergeIterators(iterators, ASC, SortOptions().Limit(10)),
+ std::make_shared<LimitIterator>(10, std::make_shared<IntIterator>(0, 20, 1)));
}
}
};
@@ -340,63 +368,54 @@ public:
void run() {
unittest::TempDir tempDir("sorterTests");
- Options opts;
- opts.tempDir = tempDir.path();
+ const SortOptions opts = SortOptions().TempDir(tempDir.path()).ExtSortAllowed();
{ // test empty (no limit)
- ASSERT_ITERATORS_EQUIVALENT(makeSorter(opts)->done(),
- std::make_unique<EmptyIterator>());
+ ASSERT_ITERATORS_EQUIVALENT(done(makeSorter(opts).get()),
+ std::make_shared<EmptyIterator>());
}
{ // test empty (limit 1)
- opts.limit = 1;
- ASSERT_ITERATORS_EQUIVALENT(makeSorter(opts)->done(),
- std::make_unique<EmptyIterator>());
+ ASSERT_ITERATORS_EQUIVALENT(done(makeSorter(SortOptions(opts).Limit(1)).get()),
+ std::make_shared<EmptyIterator>());
}
{ // test empty (limit 10)
- opts.limit = 10;
- ASSERT_ITERATORS_EQUIVALENT(makeSorter(opts)->done(),
- std::make_unique<EmptyIterator>());
+ ASSERT_ITERATORS_EQUIVALENT(done(makeSorter(SortOptions(opts).Limit(10)).get()),
+ std::make_shared<EmptyIterator>());
}
- opts.limit = 0;
const auto runTests = [this, &opts](bool assertRanges) {
{ // test all data ASC
- std::unique_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(ASC));
+ std::shared_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(ASC));
addData(sorter.get());
- ASSERT_ITERATORS_EQUIVALENT(sorter->done(), correct());
+ ASSERT_ITERATORS_EQUIVALENT(done(sorter.get()), correct());
ASSERT_EQ(numAdded(), sorter->numSorted());
if (assertRanges) {
assertRangeInfo(sorter, opts);
}
}
{ // test all data DESC
- std::unique_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(DESC));
+ std::shared_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(DESC));
addData(sorter.get());
- ASSERT_ITERATORS_EQUIVALENT(sorter->done(), correctReverse());
+ ASSERT_ITERATORS_EQUIVALENT(done(sorter.get()), correctReverse());
ASSERT_EQ(numAdded(), sorter->numSorted());
if (assertRanges) {
assertRangeInfo(sorter, opts);
}
}
- // The debug builds are too slow to run these tests.
- // Among other things, MSVC++ makes all heap functions O(N) not O(logN).
+// The debug builds are too slow to run these tests.
+// Among other things, MSVC++ makes all heap functions O(N) not O(logN).
#if !defined(MONGO_CONFIG_DEBUG_BUILD)
{ // merge all data ASC
- std::unique_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(ASC)),
+ std::shared_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(ASC)),
makeSorter(opts, IWComparator(ASC))};
addData(sorters[0].get());
addData(sorters[1].get());
- std::vector<std::unique_ptr<IWIterator>> iters1;
- iters1.push_back(sorters[0]->done());
- iters1.push_back(sorters[1]->done());
-
- std::vector<std::unique_ptr<IWIterator>> iters2;
- iters2.push_back(correct());
- iters2.push_back(correct());
-
+ std::shared_ptr<IWIterator> iters1[] = {done(sorters[0].get()),
+ done(sorters[1].get())};
+ std::shared_ptr<IWIterator> iters2[] = {correct(), correct()};
ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iters1, ASC),
mergeIterators(iters2, ASC));
@@ -406,22 +425,16 @@ public:
}
}
{ // merge all data DESC and use multiple threads to insert
- std::unique_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(DESC)),
+ std::shared_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(DESC)),
makeSorter(opts, IWComparator(DESC))};
stdx::thread inBackground(&Basic::addData, this, sorters[0].get());
addData(sorters[1].get());
inBackground.join();
- std::vector<std::unique_ptr<IWIterator>> iters1;
-
- iters1.push_back(sorters[0]->done());
- iters1.push_back(sorters[1]->done());
-
- std::vector<std::unique_ptr<IWIterator>> iters2;
- iters2.push_back(correctReverse());
- iters2.push_back(correctReverse());
-
+ std::shared_ptr<IWIterator> iters1[] = {done(sorters[0].get()),
+ done(sorters[1].get())};
+ std::shared_ptr<IWIterator> iters2[] = {correctReverse(), correctReverse()};
ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iters1, DESC),
mergeIterators(iters2, DESC));
@@ -463,13 +476,13 @@ public:
}
// returns an iterator with the correct results
- virtual std::unique_ptr<IWIterator> correct() {
- return std::make_unique<IntIterator>(0, 5); // 0, 1, ... 4
+ virtual std::shared_ptr<IWIterator> correct() {
+ return std::make_shared<IntIterator>(0, 5); // 0, 1, ... 4
}
// like correct but with opposite sort direction
- virtual std::unique_ptr<IWIterator> correctReverse() {
- return std::make_unique<IntIterator>(4, -1, -1); // 4, 3, ... 0
+ virtual std::shared_ptr<IWIterator> correctReverse() {
+ return std::make_shared<IntIterator>(4, -1, -1); // 4, 3, ... 0
}
virtual size_t correctNumRanges() const {
@@ -477,23 +490,27 @@ public:
}
// It is safe to ignore / overwrite any part of options
- virtual Options adjustSortOptions(Options opts) {
+ virtual SortOptions adjustSortOptions(SortOptions opts) {
return opts;
}
private:
// Make a new sorter with desired opts and comp. Opts may be ignored but not comp
- std::unique_ptr<IWSorter> makeSorter(Options opts, IWComparator comp = IWComparator(ASC)) {
- return sorter::make<IntWrapper, IntWrapper>("sorter-test", adjustSortOptions(opts), comp);
+ std::shared_ptr<IWSorter> makeSorter(SortOptions opts, IWComparator comp = IWComparator(ASC)) {
+ return std::shared_ptr<IWSorter>(IWSorter::make(adjustSortOptions(opts), comp));
+ }
+
+ std::shared_ptr<IWIterator> done(IWSorter* sorter) {
+ return std::shared_ptr<IWIterator>(sorter->done());
}
- void assertRangeInfo(const std::unique_ptr<IWSorter>& sorter, const Options& opts) {
+ void assertRangeInfo(const std::shared_ptr<IWSorter>& sorter, const SortOptions& opts) {
auto numRanges = correctNumRanges();
if (numRanges == 0)
return;
auto state = sorter->persistDataForShutdown();
- if (opts.tempDir) {
+ if (opts.extSortAllowed) {
ASSERT_NE(state.fileName, "");
}
ASSERT_EQ(state.ranges.size(), numRanges);
@@ -501,9 +518,8 @@ private:
};
class Limit : public Basic {
- Options adjustSortOptions(Options opts) override {
- opts.limit = 5;
- return opts;
+ SortOptions adjustSortOptions(SortOptions opts) override {
+ return opts.Limit(5);
}
void addData(IWSorter* sorter) override {
sorter->add(0, 0);
@@ -516,19 +532,18 @@ class Limit : public Basic {
size_t numAdded() const override {
return 6;
}
- std::unique_ptr<IWIterator> correct() override {
- return std::make_unique<IntIterator>(-1, 4);
+ std::shared_ptr<IWIterator> correct() override {
+ return std::make_shared<IntIterator>(-1, 4);
}
- std::unique_ptr<IWIterator> correctReverse() override {
- return std::make_unique<IntIterator>(4, -1, -1);
+ std::shared_ptr<IWIterator> correctReverse() override {
+ return std::make_shared<IntIterator>(4, -1, -1);
}
};
template <uint64_t Limit>
class LimitExtreme : public Basic {
- Options adjustSortOptions(Options opts) override {
- opts.limit = Limit;
- return opts;
+ SortOptions adjustSortOptions(SortOptions opts) override {
+ return opts.Limit(Limit);
}
};
@@ -548,13 +563,13 @@ class Dupes : public Basic {
size_t numAdded() const override {
return 10;
}
- std::unique_ptr<IWIterator> correct() override {
- static auto data = makeDataForInMemIterator({-1, -1, -1, 0, 1, 1, 1, 2, 2, 3});
- return makeInMemIterator(data);
+ std::shared_ptr<IWIterator> correct() override {
+ const int array[] = {-1, -1, -1, 0, 1, 1, 1, 2, 2, 3};
+ return makeInMemIterator(array);
}
- std::unique_ptr<IWIterator> correctReverse() override {
- static auto data = makeDataForInMemIterator({3, 2, 2, 1, 1, 1, 0, -1, -1, -1});
- return makeInMemIterator(data);
+ std::shared_ptr<IWIterator> correctReverse() override {
+ const int array[] = {3, 2, 2, 1, 1, 1, 0, -1, -1, -1};
+ return makeInMemIterator(array);
}
};
@@ -569,14 +584,12 @@ public:
std::shuffle(_array.get(), _array.get() + NUM_ITEMS, _random.urbg());
}
- Options adjustSortOptions(Options opts) override {
+ SortOptions adjustSortOptions(SortOptions opts) override {
// Make sure we use a reasonable number of files when we spill
MONGO_STATIC_ASSERT((NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT > 50);
MONGO_STATIC_ASSERT((NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT < 500);
- opts.maxMemoryUsageBytes = MEM_LIMIT;
-
- return opts;
+ return opts.MaxMemoryUsageBytes(MEM_LIMIT).ExtSortAllowed();
}
void addData(IWSorter* sorter) override {
@@ -588,11 +601,11 @@ public:
return NUM_ITEMS;
}
- std::unique_ptr<IWIterator> correct() override {
- return std::make_unique<IntIterator>(0, NUM_ITEMS);
+ std::shared_ptr<IWIterator> correct() override {
+ return std::make_shared<IntIterator>(0, NUM_ITEMS);
}
- std::unique_ptr<IWIterator> correctReverse() override {
- return std::make_unique<IntIterator>(NUM_ITEMS - 1, -1, -1);
+ std::shared_ptr<IWIterator> correctReverse() override {
+ return std::make_shared<IntIterator>(NUM_ITEMS - 1, -1, -1);
}
size_t correctNumRanges() const override {
@@ -613,7 +626,7 @@ public:
template <long long Limit, bool Random = true>
class LotsOfDataWithLimit : public LotsOfDataLittleMemory<Random> {
typedef LotsOfDataLittleMemory<Random> Parent;
- Options adjustSortOptions(Options opts) {
+ SortOptions adjustSortOptions(SortOptions opts) {
// Make sure our tests will spill or not as desired
MONGO_STATIC_ASSERT(MEM_LIMIT / 2 > (100 * sizeof(IWPair)));
MONGO_STATIC_ASSERT(MEM_LIMIT < (5000 * sizeof(IWPair)));
@@ -623,16 +636,13 @@ class LotsOfDataWithLimit : public LotsOfDataLittleMemory<Random> {
MONGO_STATIC_ASSERT((Parent::NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT > 100);
MONGO_STATIC_ASSERT((Parent::NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT < 500);
- opts.maxMemoryUsageBytes = MEM_LIMIT;
- opts.limit = Limit;
-
- return opts;
+ return opts.MaxMemoryUsageBytes(MEM_LIMIT).ExtSortAllowed().Limit(Limit);
}
- std::unique_ptr<IWIterator> correct() override {
- return std::make_unique<LimitIterator>(Limit, Parent::correct());
+ std::shared_ptr<IWIterator> correct() override {
+ return std::make_shared<LimitIterator>(Limit, Parent::correct());
}
- std::unique_ptr<IWIterator> correctReverse() override {
- return std::make_unique<LimitIterator>(Limit, Parent::correctReverse());
+ std::shared_ptr<IWIterator> correctReverse() override {
+ return std::make_shared<LimitIterator>(Limit, Parent::correctReverse());
}
size_t correctNumRanges() const override {
// For the TopKSorter, the number of ranges depends on the specific composition of the data
@@ -707,53 +717,55 @@ DEATH_TEST_F(
SorterMakeFromExistingRangesTest,
NonZeroLimit,
"Creating a Sorter from existing ranges is only available with the NoLimitSorter (limit 0)") {
- Options opts;
- opts.limit = 1;
- opts.tempDir = "unused_temp_dir";
- sorter::makeFromExistingRanges<IntWrapper, IntWrapper>("", {}, opts, IWComparator(ASC));
+ auto opts = SortOptions().Limit(1ULL);
+ IWSorter::makeFromExistingRanges("", {}, opts, IWComparator(ASC));
}
-DEATH_TEST_F(SorterMakeFromExistingRangesTest, ExtSortNotAllowed, "options.tempDir") {
- Options opts;
- ASSERT_FALSE(opts.tempDir);
- sorter::makeFromExistingRanges<IntWrapper, IntWrapper>("", {}, opts, IWComparator(ASC));
+DEATH_TEST_F(SorterMakeFromExistingRangesTest, ExtSortNotAllowed, "opts.extSortAllowed") {
+ auto opts = SortOptions();
+ ASSERT_FALSE(opts.extSortAllowed);
+ IWSorter::makeFromExistingRanges("", {}, opts, IWComparator(ASC));
+}
+
+DEATH_TEST_F(SorterMakeFromExistingRangesTest, EmptyTempDir, "!opts.tempDir.empty()") {
+ auto opts = SortOptions().ExtSortAllowed();
+ ASSERT_EQUALS("", opts.tempDir);
+ IWSorter::makeFromExistingRanges("", {}, opts, IWComparator(ASC));
}
DEATH_TEST_F(SorterMakeFromExistingRangesTest, EmptyFileName, "!fileName.empty()") {
std::string fileName;
- Options opts;
- opts.tempDir = "unused_temp_dir";
- sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(fileName, {}, opts, IWComparator(ASC));
+ auto opts = SortOptions().ExtSortAllowed().TempDir("unused_temp_dir");
+ IWSorter::makeFromExistingRanges(fileName, {}, opts, IWComparator(ASC));
}
TEST_F(SorterMakeFromExistingRangesTest, SkipFileCheckingOnEmptyRanges) {
auto fileName = "unused_sorter_file";
- Options opts;
- opts.tempDir = "unused_temp_dir";
- auto sorter = sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(
- fileName, {}, opts, IWComparator(ASC));
+ auto opts = SortOptions().ExtSortAllowed().TempDir("unused_temp_dir");
+ auto sorter = std::unique_ptr<IWSorter>(
+ IWSorter::makeFromExistingRanges(fileName, {}, opts, IWComparator(ASC)));
ASSERT_EQ(0, sorter->numSpills());
auto iter = std::unique_ptr<IWIterator>(sorter->done());
ASSERT_EQ(0, sorter->numSorted());
+ iter->openSource();
ASSERT_FALSE(iter->more());
+ iter->closeSource();
}
TEST_F(SorterMakeFromExistingRangesTest, MissingFile) {
auto fileName = "unused_sorter_file";
auto tempDir = "unused_temp_dir";
- Options opts;
- opts.tempDir = tempDir;
- auto makeSorter = [&] {
- sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(
- fileName, makeSampleRanges(), opts, IWComparator(ASC));
- };
- ASSERT_THROWS_WITH_CHECK(makeSorter(), std::exception, [&](const auto& ex) {
- ASSERT_STRING_CONTAINS(ex.what(), tempDir);
- ASSERT_STRING_CONTAINS(ex.what(), fileName);
- });
+ auto opts = SortOptions().ExtSortAllowed().TempDir(tempDir);
+ ASSERT_THROWS_WITH_CHECK(
+ IWSorter::makeFromExistingRanges(fileName, makeSampleRanges(), opts, IWComparator(ASC)),
+ std::exception,
+ [&](const auto& ex) {
+ ASSERT_STRING_CONTAINS(ex.what(), tempDir);
+ ASSERT_STRING_CONTAINS(ex.what(), fileName);
+ });
}
TEST_F(SorterMakeFromExistingRangesTest, EmptyFile) {
@@ -762,14 +774,12 @@ TEST_F(SorterMakeFromExistingRangesTest, EmptyFile) {
ASSERT(std::ofstream(tempFilePath.string()))
<< "failed to create empty temporary file: " << tempFilePath.string();
auto fileName = tempFilePath.filename().string();
- Options opts;
- opts.tempDir = tempDir.path();
- auto makeSorter = [&] {
- sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(
- fileName, makeSampleRanges(), opts, IWComparator(ASC));
- };
+ auto opts = SortOptions().ExtSortAllowed().TempDir(tempDir.path());
// 16815 - unexpected empty file.
- ASSERT_THROWS_CODE(makeSorter(), DBException, 16815);
+ ASSERT_THROWS_CODE(
+ IWSorter::makeFromExistingRanges(fileName, makeSampleRanges(), opts, IWComparator(ASC)),
+ DBException,
+ 16815);
}
TEST_F(SorterMakeFromExistingRangesTest, CorruptedFile) {
@@ -781,10 +791,9 @@ TEST_F(SorterMakeFromExistingRangesTest, CorruptedFile) {
ofs << "invalid sorter data";
}
auto fileName = tempFilePath.filename().string();
- Options opts;
- opts.tempDir = tempDir.path();
- auto sorter = sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(
- fileName, makeSampleRanges(), opts, IWComparator(ASC));
+ auto opts = SortOptions().ExtSortAllowed().TempDir(tempDir.path());
+ auto sorter = std::unique_ptr<IWSorter>(
+ IWSorter::makeFromExistingRanges(fileName, makeSampleRanges(), opts, IWComparator(ASC)));
// The number of spills is set when NoLimitSorter is constructed from existing ranges.
ASSERT_EQ(makeSampleRanges().size(), sorter->numSpills());
@@ -797,9 +806,10 @@ TEST_F(SorterMakeFromExistingRangesTest, CorruptedFile) {
TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
unittest::TempDir tempDir(_agent.getSuiteName() + "_" + _agent.getTestName());
- Options opts;
- opts.tempDir = tempDir.path();
- opts.maxMemoryUsageBytes = sizeof(IWSorter::Data);
+ auto opts = SortOptions()
+ .ExtSortAllowed()
+ .TempDir(tempDir.path())
+ .MaxMemoryUsageBytes(sizeof(IWSorter::Data));
IWPair pairInsertedBeforeShutdown(1, 100);
@@ -809,7 +819,7 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
IWSorter::PersistedState state;
{
auto sorterBeforeShutdown =
- sorter::make<IntWrapper, IntWrapper>("sorter-test", opts, IWComparator(ASC));
+ std::unique_ptr<IWSorter>(IWSorter::make(opts, IWComparator(ASC)));
sorterBeforeShutdown->add(pairInsertedBeforeShutdown.first,
pairInsertedBeforeShutdown.second);
state = sorterBeforeShutdown->persistDataForShutdown();
@@ -819,8 +829,8 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
}
// On restart, reconstruct sorter from persisted state.
- auto sorter = sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(
- state.fileName, state.ranges, opts, IWComparator(ASC));
+ auto sorter = std::unique_ptr<IWSorter>(
+ IWSorter::makeFromExistingRanges(state.fileName, state.ranges, opts, IWComparator(ASC)));
// The number of spills is set when NoLimitSorter is constructed from existing ranges.
ASSERT_EQ(state.ranges.size(), sorter->numSpills());
@@ -829,12 +839,13 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
IWPair pairInsertedAfterStartup(2, 200);
sorter->add(pairInsertedAfterStartup.first, pairInsertedAfterStartup.second);
- // Only the pair added after reconstructing the Sorter is counted.
- ASSERT_EQ(1, sorter->numSorted());
+ // Technically this sorter has not sorted anything. It is just merging files.
+ ASSERT_EQ(0, sorter->numSorted());
// Read data from sorter.
{
auto iter = std::unique_ptr<IWIterator>(sorter->done());
+ iter->openSource();
ASSERT(iter->more());
auto pair1 = iter->next();
@@ -851,8 +862,10 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
<< pair2.first << "/" << pair2.second;
ASSERT_FALSE(iter->more());
+ iter->closeSource();
}
}
} // namespace
-} // namespace mongo::sorter
+} // namespace sorter
+} // namespace mongo
diff --git a/src/mongo/db/sorter/spillable_sorter.h b/src/mongo/db/sorter/spillable_sorter.h
deleted file mode 100644
index 724194ebe2c..00000000000
--- a/src/mongo/db/sorter/spillable_sorter.h
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/in_mem_iterator.h"
-#include "mongo/db/sorter/merge_iterator.h"
-#include "mongo/db/sorter/sorted_file_writer.h"
-#include "mongo/db/sorter/sorter.h"
-
-namespace mongo::sorter {
-/**
- * A Sorter which may spill to disk if configured to do so. Each instance of this class will, if
- * spilling is enabled, generate a file name and spill sorted data ranges to that file.
- */
-template <typename Key, typename Value>
-class SpillableSorter : public Sorter<Key, Value> {
-public:
- using Base = Sorter<Key, Value>;
- using Data = typename Base::Data;
- using Iterator = typename Base::Iterator;
- using CompFn = typename Base::CompFn;
- using Settings = typename Base::Settings;
-
- using Base::_comp;
- using Base::_done;
-
- SpillableSorter(StringData name,
- const Options& options,
- const CompFn& comp,
- const Settings& settings)
- : SpillableSorter(options,
- comp,
- settings,
- options.tempDir
- ? std::make_unique<File>(*options.tempDir + "/" + nextFileName(name))
- : nullptr) {}
-
- SpillableSorter(const Options& options,
- const CompFn& comp,
- const Settings& settings,
- const std::string& fileName)
- : SpillableSorter(
- options, comp, settings, std::make_unique<File>(*options.tempDir + "/" + fileName)) {
- invariant(!fileName.empty());
- }
-
- std::unique_ptr<Iterator> done(typename Iterator::ReturnPolicy returnPolicy) {
- invariant(!std::exchange(_done, true));
-
- if (_spilled.empty()) {
- _sort();
- return std::make_unique<InMemIterator<Key, Value>>(_data, returnPolicy);
- }
-
- _spill();
- return std::make_unique<MergeIterator<Key, Value>>(_spilled, _options.limit, _comp);
- }
-
- size_t numSpills() const override {
- return _spilled.size();
- }
-
-protected:
- virtual void _sort() = 0;
-
- void _spill() {
- if (_data.empty()) {
- return;
- }
-
- if (!_options.tempDir) {
- // This error message only applies to sorts from user queries made through the find or
- // aggregation commands. Other clients should suppress this error, either by allowing
- // external sorting or by catching and throwing a more appropriate error.
- uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- str::stream()
- << "Sort exceeded memory limit of " << _options.maxMemoryUsageBytes
- << " bytes, but did not opt in to external sorting.");
- }
-
- _sort();
- _updateStateAfterSort();
-
- SortedFileWriter<Key, Value> writer(_file.get(), _options.dbName, _settings);
- for (size_t i = 0; i < _data.size(); i++) {
- writer.addAlreadySorted(_data[i].first, _data[i].second);
- }
- _spilled.push_back(writer.done());
-
- // Clear _data and release backing array's memory.
- std::vector<Data>().swap(_data);
- _memUsed = 0;
- }
-
- virtual void _updateStateAfterSort() {}
-
- const Options _options;
- const std::function<bool(const Data&, const Data&)> _less;
- const Settings _settings;
-
- std::vector<Data> _data;
-
- std::unique_ptr<File> _file;
- std::vector<std::unique_ptr<Iterator>> _spilled;
-
- size_t _memUsed = 0;
-
-private:
- SpillableSorter(const Options& options,
- const CompFn& comp,
- const Settings& settings,
- std::unique_ptr<File> file)
- : Base(comp),
- _options(options),
- _less([comp](const Data& lhs, const Data& rhs) {
- dassertCompIsSane(comp, lhs, rhs);
- return comp(lhs, rhs) < 0;
- }),
- _settings(settings),
- _file(std::move(file)) {}
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/top_k_sorter.h b/src/mongo/db/sorter/top_k_sorter.h
deleted file mode 100644
index 9a0569171fb..00000000000
--- a/src/mongo/db/sorter/top_k_sorter.h
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/sorter/sorter.h"
-
-#include "mongo/db/sorter/spillable_sorter.h"
-
-namespace mongo::sorter {
-template <typename Key, typename Value>
-class TopKSorter : public SpillableSorter<Key, Value> {
-public:
- using Base = SpillableSorter<Key, Value>;
- using Data = typename Base::Data;
- using Iterator = typename Base::Iterator;
- using CompFn = typename Base::CompFn;
- using Settings = typename Base::Settings;
-
- using Base::_data;
- using Base::_done;
- using Base::_less;
- using Base::_memUsed;
- using Base::_numSorted;
- using Base::_options;
- using Base::_spill;
- using Base::_totalDataSizeSorted;
-
- TopKSorter(StringData name,
- const Options& options,
- const CompFn& comp,
- const Settings& settings)
- : Base(name, options, comp, settings) {
- // This also works with limit 1, but LimitOneSorter should be used instead for that case.
- invariant(options.limit > 1);
-
- // Preallocate a fixed sized vector of the required size if we don't expect it to have a
- // major impact on our memory budget. This is the common case with small limits.
- if (options.limit < std::min((options.maxMemoryUsageBytes / 10) /
- sizeof(typename decltype(_data)::value_type),
- _data.max_size())) {
- _data.reserve(options.limit);
- }
- }
-
- void add(const Key& key, const Value& val) {
- invariant(!_done);
-
- ++_numSorted;
-
- Data contender{key, val};
-
- if (_data.size() < _options.limit) {
- if (_haveCutoff && !_less(contender, _cutoff)) {
- return;
- }
-
- _data.emplace_back(contender.first.getOwned(), contender.second.getOwned());
-
- auto memUsage = key.memUsageForSorter() + val.memUsageForSorter();
- _memUsed += memUsage;
- _totalDataSizeSorted += memUsage;
-
- if (_data.size() == _options.limit) {
- std::make_heap(_data.begin(), _data.end(), _less);
- }
-
- if (_memUsed > _options.maxMemoryUsageBytes) {
- _spill();
- }
-
- return;
- }
-
- invariant(_data.size() == _options.limit);
-
- if (!_less(contender, _data.front())) {
- return;
- }
-
- // Remove the old worst pair and insert the contender, adjusting _memUsed.
-
- auto memUsage = key.memUsageForSorter() + val.memUsageForSorter();
- _memUsed += memUsage;
- _totalDataSizeSorted += memUsage;
-
- _memUsed -= _data.front().first.memUsageForSorter();
- _memUsed -= _data.front().second.memUsageForSorter();
-
- std::pop_heap(_data.begin(), _data.end(), _less);
- _data.back() = {contender.first.getOwned(), contender.second.getOwned()};
- std::push_heap(_data.begin(), _data.end(), _less);
-
- if (_memUsed > _options.maxMemoryUsageBytes) {
- _spill();
- }
- }
-
-private:
- void _sort() {
- if (_data.size() == _options.limit) {
- std::sort_heap(_data.begin(), _data.end(), _less);
- } else {
- std::stable_sort(_data.begin(), _data.end(), _less);
- }
- }
-
- // Can only be called after _data is sorted
- void _updateStateAfterSort() override {
- // Theory of operation: We want to be able to eagerly ignore values we know will not
- // be in the TopK result set by setting _cutoff to a value we know we have at least
- // K values equal to or better than. There are two values that we track to
- // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When
- // one of these values becomes the new _cutoff, its associated counter is reset to 0
- // and a new value is chosen for that member the next time we spill.
- //
- // _worstSeen is the worst value we've seen so that all kept values are better than
- // (or equal to) it. This means that once _worstCount >= _opts.limit there is no
- // reason to consider values worse than _worstSeen so it can become the new _cutoff.
- // This technique is especially useful when the input is already roughly sorted (eg
- // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff
- // that will exclude most later values, making the full TopK operation including
- // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time.
- //
- // _lastMedian was the median of the _data in the first spill() either overall or
- // following a promotion of _lastMedian to _cutoff. We count the number of kept
- // values that are better than or equal to _lastMedian in _medianCount and can
- // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming
- // reasonable median selection (which should happen when the data is completely
- // unsorted), after the first K spilled values, we will keep roughly 50% of the
- // incoming values, 25% after the second K, 12.5% after the third K, etc. This means
- // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values,
- // so the expected number of kept values is O(Log(N/K) * K). The final run time if
- // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) +
- // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)).
- //
- // This leaves a currently unoptimized worst case of data that is already roughly
- // sorted, but in the wrong direction, such that the desired results are all the
- // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this
- // should be trivially detectable, as a future optimization it might be nice to
- // detect this case and reverse the direction of input (if possible) which would
- // turn this into the best case described above.
- //
- // Pedantic notes: The time complexities above (which count number of comparisons)
- // ignore the sorting of batches prior to spilling to disk since they make it more
- // confusing without changing the results. If you want to add them back in, add an
- // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also,
- // all space complexities measure disk space rather than memory since this class is
- // O(1) in memory due to the _opts.maxMemoryUsageBytes limit.
-
- // Pick a new _worstSeen or _lastMedian if should.
- if (_worstCount == 0 || _less(_worstSeen, _data.back())) {
- _worstSeen = _data.back();
- }
- if (_medianCount == 0) {
- size_t medianIndex = _data.size() / 2; // Chooses the higher if size is even.
- _lastMedian = _data[medianIndex];
- }
-
- // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian.
- _worstCount += _data.size(); // Everything is better or equal.
- typename std::vector<Data>::iterator firstWorseThanLastMedian =
- std::upper_bound(_data.begin(), _data.end(), _lastMedian, _less);
- _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian);
-
-
- // Promote _worstSeen or _lastMedian to _cutoff and reset counters if we should.
- if (_worstCount >= _options.limit) {
- if (!_haveCutoff || _less(_worstSeen, _cutoff)) {
- _cutoff = _worstSeen;
- _haveCutoff = true;
- }
- _worstCount = 0;
- }
- if (_medianCount >= _options.limit) {
- if (!_haveCutoff || _less(_lastMedian, _cutoff)) {
- _cutoff = _lastMedian;
- _haveCutoff = true;
- }
- _medianCount = 0;
- }
- }
-
- // See updateCutoff() for a full description of how these members are used.
- bool _haveCutoff = false;
- Data _cutoff; // We can definitely ignore values worse than this.
- Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit.
- size_t _worstCount = 0; // Number of docs better or equal to _worstSeen kept so far.
- Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit.
- size_t _medianCount = 0; // Number of docs better or equal to _lastMedian kept so far.
-};
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/util.cpp b/src/mongo/db/sorter/util.cpp
deleted file mode 100644
index f4cfc13e0f1..00000000000
--- a/src/mongo/db/sorter/util.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/sorter/util.h"
-
-#include "mongo/platform/random.h"
-
-namespace mongo::sorter {
-std::string nextFileName(StringData name) {
- static AtomicWord<unsigned> fileCounter;
- static const auto randomSuffix = SecureRandom{}.nextInt64();
- return str::stream() << "extsort-" << name << "." << fileCounter.fetchAndAdd(1) << '-'
- << randomSuffix;
-}
-
-uint32_t addDataToChecksum(const void* data, std::size_t size, uint32_t checksum) {
- uint32_t newChecksum;
- MurmurHash3_x86_32(data, size, checksum, &newChecksum);
- return newChecksum;
-}
-
-EncryptionHooks* getEncryptionHooksIfEnabled() {
- // Some tests may not run with a global service context.
- if (!hasGlobalServiceContext()) {
- return nullptr;
- }
- auto service = getGlobalServiceContext();
- auto encryptionHooks = EncryptionHooks::get(service);
- if (!encryptionHooks->enabled()) {
- return nullptr;
- }
- return encryptionHooks;
-}
-} // namespace mongo::sorter
diff --git a/src/mongo/db/sorter/util.h b/src/mongo/db/sorter/util.h
deleted file mode 100644
index 3ff9d17a1b9..00000000000
--- a/src/mongo/db/sorter/util.h
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <cstdint>
-#include <third_party/murmurhash3/MurmurHash3.h>
-
-#include "mongo/config.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/sorter/options.h"
-#include "mongo/db/storage/encryption_hooks.h"
-
-namespace mongo::sorter {
-/**
- * Generates a new file name on each call using a static, atomic and monotonically increasing
- * number. Each name is suffixed with a random number generated at startup, to prevent name
- * collisions when the external sort files are preserved across restarts.
- */
-std::string nextFileName(StringData name);
-
-/**
- * Calculates and returns a new murmur hash value based on the prior murmur hash and a new piece
- * of data.
- */
-uint32_t addDataToChecksum(const void* data, std::size_t size, uint32_t checksum);
-
-/**
- * Returns the current EncryptionHooks registered with the global service context.
- * Returns nullptr if the service context is not available; or if the EncyptionHooks
- * registered is not enabled.
- */
-EncryptionHooks* getEncryptionHooksIfEnabled();
-
-template <typename Data>
-void dassertCompIsSane(const std::function<int(const Data& lhs, const Data& rhs)>& comp,
- const Data& lhs,
- const Data& rhs) {
-#if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER)
- // MSVC++ already does similar verification in debug mode in addition to using algorithms that
- // do more comparisons. Doing our own verification in addition makes debug builds considerably
- // slower without any additional safety.
-
- // Test reversed comparisons.
- const int regular = comp(lhs, rhs);
- if (regular == 0) {
- invariant(comp(rhs, lhs) == 0);
- } else if (regular < 0) {
- invariant(comp(rhs, lhs) > 0);
- } else {
- invariant(comp(rhs, lhs) < 0);
- }
-
- // Test reflexivity.
- invariant(comp(lhs, lhs) == 0);
- invariant(comp(rhs, rhs) == 0);
-#endif
-}
-} // namespace mongo::sorter