diff options
author | Yuhong Zhang <danielzhangyh@gmail.com> | 2021-12-29 20:53:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-30 04:09:14 +0000 |
commit | 23166382f8f7d7c9ed322e590e5020d37a29b540 (patch) | |
tree | 5aca5561e63a12381996333d592bfae03548ee67 /src/mongo/db/sorter | |
parent | 91d9b5a0d92784178c1f01e6a5766e6366f1c797 (diff) | |
download | mongo-23166382f8f7d7c9ed322e590e5020d37a29b540.tar.gz |
Revert "SERVER-62056 Improve `Sorter` code structure"
This reverts commit 4cfcc10775e2cab05a6e30c2516994ab67b9bd7d.
Diffstat (limited to 'src/mongo/db/sorter')
23 files changed, 1805 insertions, 2198 deletions
diff --git a/src/mongo/db/sorter/SConscript b/src/mongo/db/sorter/SConscript index 491a2a67d97..26084eca010 100644 --- a/src/mongo/db/sorter/SConscript +++ b/src/mongo/db/sorter/SConscript @@ -16,28 +16,8 @@ sorterEnv.CppUnitTest( '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/third_party/shim_snappy', - 'sorter', - ], -) - -sorterEnv.Library( - target='sorter', - source=[ - 'compression.cpp', - 'file.cpp', - 'util.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/idl/idl_parser', - '$BUILD_DIR/third_party/shim_snappy', 'sorter_idl', ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/service_context', - '$BUILD_DIR/mongo/db/storage/encryption_hooks', - '$BUILD_DIR/mongo/s/is_mongos', - ], ) env.Library( diff --git a/src/mongo/db/sorter/compression.cpp b/src/mongo/db/sorter/compression.cpp deleted file mode 100644 index cd5e3b07c51..00000000000 --- a/src/mongo/db/sorter/compression.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/sorter/compression.h" - -#include <snappy.h> - -namespace mongo::sorter { -bool compress(const char* buffer, std::size_t size, std::string* output) { - return snappy::Compress(buffer, size, output); -} - -bool decompress(const char* buffer, std::size_t size, char* output) { - return snappy::RawUncompress(buffer, size, output); -} - -bool getUncompressedSize(const char* buffer, std::size_t size, std::size_t* result) { - return snappy::GetUncompressedLength(buffer, size, result); -} - -bool isValidCompressedBuffer(const char* buffer, std::size_t size) { - return snappy::IsValidCompressedBuffer(buffer, size); -} -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/compression.h b/src/mongo/db/sorter/compression.h deleted file mode 100644 index 7afe26c137c..00000000000 --- a/src/mongo/db/sorter/compression.h +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <cstdint> -#include <string> - -namespace mongo::sorter { -bool compress(const char* buffer, std::size_t size, std::string* output); - -bool decompress(const char* buffer, std::size_t size, char* output); - -bool getUncompressedSize(const char* buffer, std::size_t size, std::size_t* result); - -bool isValidCompressedBuffer(const char* buffer, std::size_t size); -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/factory.h b/src/mongo/db/sorter/factory.h deleted file mode 100644 index 40b3158f09c..00000000000 --- a/src/mongo/db/sorter/factory.h +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/limit_one_sorter.h" -#include "mongo/db/sorter/no_limit_sorter.h" -#include "mongo/db/sorter/top_k_sorter.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -std::unique_ptr<Sorter<Key, Value>> make( - StringData name, - const Options& options, - const typename Sorter<Key, Value>::CompFn& comp, - const typename Sorter<Key, Value>::Settings& settings = {}) { - switch (options.limit) { - case 0: - return std::make_unique<NoLimitSorter<Key, Value>>(name, options, comp, settings); - case 1: - return std::make_unique<LimitOneSorter<Key, Value>>(comp); - default: - return std::make_unique<TopKSorter<Key, Value>>(name, options, comp, settings); - } -} - -template <typename Key, typename Value> -std::unique_ptr<Sorter<Key, Value>> makeFromExistingRanges( - const std::string& fileName, - const std::vector<SorterRange>& ranges, - const Options& options, - const typename Sorter<Key, Value>::CompFn& comp, - const typename Sorter<Key, Value>::Settings& settings = {}) { - invariant(options.tempDir); - invariant(options.limit == 0, - str::stream() << "Creating a Sorter from existing ranges is only available with the " - "NoLimitSorter (limit 0), but got limit " - << options.limit); - - return std::make_unique<NoLimitSorter<Key, Value>>(fileName, ranges, options, comp, settings); -} -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/file.cpp b/src/mongo/db/sorter/file.cpp deleted file mode 100644 index f32d221c79c..00000000000 --- a/src/mongo/db/sorter/file.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/sorter/file.h" - -#include <boost/filesystem/operations.hpp> - -#include "mongo/util/destructor_guard.h" - -namespace mongo::sorter { -namespace { -// We need to use the "real" errno everywhere, not GetLastError() on Windows. -std::string getError() { - int errnoCopy = errno; - StringBuilder sb; - sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy); - return sb.str(); -} -} // namespace - -File::~File() { - if (_keep) { - return; - } - - if (_file.is_open()) { - DESTRUCTOR_GUARD(_file.exceptions(std::ios::failbit)); - DESTRUCTOR_GUARD(_file.close()); - } - - DESTRUCTOR_GUARD(boost::filesystem::remove(_path)); -} - -void File::read(std::streamoff offset, std::streamsize size, void* out) { - if (!_file.is_open()) { - _open(); - } - - if (_offset != -1) { - _file.exceptions(std::ios::goodbit); - _file.flush(); - _offset = -1; - - uassert(5479100, - str::stream() << "Error flushing file " << _path.string() << ": " << getError(), - _file); - } - - _file.seekg(offset); - _file.read(reinterpret_cast<char*>(out), size); - - uassert(16817, - str::stream() << "Error reading file " << _path.string() << ": " << getError(), - _file); - - invariant(_file.gcount() == size, - str::stream() << "Number of bytes read (" << _file.gcount() - << ") not equal to expected number (" << size << ")"); - - uassert(51049, - str::stream() << "Error reading file " << _path.string() << ": " << getError(), - _file.tellg() >= 0); -} - -void File::write(const char* data, std::streamsize size) { - _ensureOpenForWriting(); - - try { - _file.write(data, size); - _offset += size; - } catch (const std::system_error& ex) { - if (ex.code() == std::errc::no_space_on_device) { - uasserted(ErrorCodes::OutOfDiskSpace, - str::stream() << ex.what() << ": " << _path.string()); - } - uasserted(5642403, - str::stream() << "Error writing to file " << _path.string() << ": " - << getError()); - } catch (const std::exception&) { - uasserted(16821, - str::stream() << "Error writing to file " << _path.string() << ": " - << getError()); - } -} - -std::streamoff File::currentOffset() { - _ensureOpenForWriting(); - return _offset; -} - -void File::_open() { - invariant(!_file.is_open()); - - boost::filesystem::create_directories(_path.parent_path()); - - // We open the provided file in append mode so that SortedFileWriter instances can share - // the same file, used serially. We want to share files in order to stay below system - // open file limits. - _file.open(_path.string(), std::ios::app | std::ios::binary | std::ios::in | std::ios::out); - - uassert(16818, - str::stream() << "Error opening file " << _path.string() << ": " << getError(), - _file.good()); -} - -void File::_ensureOpenForWriting() { - invariant(_offset != -1 || !_file.is_open()); - - if (_file.is_open()) { - return; - } - - _open(); - _file.exceptions(std::ios::failbit | std::ios::badbit); - _offset = boost::filesystem::file_size(_path); -} -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/file.h b/src/mongo/db/sorter/file.h deleted file mode 100644 index e47bf353bef..00000000000 --- a/src/mongo/db/sorter/file.h +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/filesystem/path.hpp> -#include <fstream> -#include <string> -#include <utility> - -#include "mongo/util/assert_util.h" - -namespace mongo::sorter { -/** - * Represents the file that a Sorter uses to spill to disk. Supports reading after writing (or - * reading without any writing), but does not support writing after any reading has been done. - */ -class File { -public: - File(std::string path) : _path(std::move(path)) { - invariant(!_path.empty()); - } - - ~File(); - - const boost::filesystem::path& path() const { - return _path; - } - - /** - * Signals that the on-disk file should not be cleaned up. - */ - void keep() { - _keep = true; - }; - - /** - * Reads the requested data from the file. Cannot write more to the file once this has been - * called. - */ - void read(std::streamoff offset, std::streamsize size, void* out); - - /** - * Writes the given data to the end of the file. Cannot be called after reading. - */ - void write(const char* data, std::streamsize size); - - /** - * Returns the current offset of the end of the file. Cannot be called after reading. - */ - std::streamoff currentOffset(); - -private: - void _open(); - - void _ensureOpenForWriting(); - - boost::filesystem::path _path; - std::fstream _file; - - // The current offset of the end of the file, or -1 if the file either has not yet been - // opened or is already being read. - std::streamoff _offset = -1; - - // Whether to keep the on-disk file even after this in-memory object has been destructed. - bool _keep = false; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/file_iterator.h b/src/mongo/db/sorter/file_iterator.h deleted file mode 100644 index 3e740584353..00000000000 --- a/src/mongo/db/sorter/file_iterator.h +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/sorted_data_iterator.h" - -#include "mongo/db/sorter/compression.h" -#include "mongo/db/sorter/file.h" -#include "mongo/db/sorter/util.h" - -namespace mongo::sorter { -/** - * Iterates over a sorted range within a file. - */ -template <typename Key, typename Value> -class FileIterator : public SortedDataIterator<Key, Value> { -public: - using Base = SortedDataIterator<Key, Value>; - using Data = typename Base::Data; - using Settings = typename Base::Settings; - - FileIterator(File* file, - std::streamoff fileStartOffset, - std::streamoff fileEndOffset, - const uint32_t checksum, - const Settings& settings, - const boost::optional<std::string>& dbName) - : _settings(settings), - _file(file), - _fileStartOffset(fileStartOffset), - _fileCurrentOffset(fileStartOffset), - _fileEndOffset(fileEndOffset), - _originalChecksum(checksum), - _dbName(dbName) {} - - ~FileIterator() { - // If the file iterator reads through all data objects, we can ensure non-corrupt data by - // comparing the newly calculated checksum with the original checksum from the data written - // to disk. Some iterators do not read back all data from the file, which prohibits the - // _afterReadChecksum from obtaining all the information needed. Thus, we only fassert if - // all data that was written to disk is read back and the checksums are not equivalent. - if (!more() && _bufferReader->atEof() && (_originalChecksum != _afterReadChecksum)) { - fassert(31182, - Status(ErrorCodes::Error::ChecksumMismatch, - "Data read from disk does not match what was written to disk. Possible " - "corruption of data.")); - } - } - - bool more() const { - return !_bufferReader || !_bufferReader->atEof() || _fileCurrentOffset < _fileEndOffset; - } - - Data next() { - if (!_bufferReader || _bufferReader->atEof()) { - _fillBuffer(); - } - - const char* startOfNewData = static_cast<const char*>(_bufferReader->pos()); - - // Note: calling read() on the _bufferReader buffer in the deserialize function advances the - // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function - // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into - // the Data constructor - auto first = Key::deserializeForSorter(*_bufferReader, _settings.first); - auto second = Value::deserializeForSorter(*_bufferReader, _settings.second); - - // The difference of _bufferReader's position before and after reading the data - // will provide the length of the data that was just read. - const char* endOfNewData = static_cast<const char*>(_bufferReader->pos()); - - _afterReadChecksum = - addDataToChecksum(startOfNewData, endOfNewData - startOfNewData, _afterReadChecksum); - - return {std::move(first), std::move(second)}; - } - - SorterRange getRange() const override { - return {_fileStartOffset, _fileEndOffset, _originalChecksum}; - } - -private: - /** - * Fills the buffer by reading from disk. - */ - void _fillBuffer() { - int32_t rawSize; - _read(&rawSize, sizeof(rawSize)); - - // Negative size means compressed. - const bool compressed = rawSize < 0; - int32_t blockSize = std::abs(rawSize); - - _buffer.reset(new char[blockSize]); - _read(_buffer.get(), blockSize); - - if (auto encryptionHooks = getEncryptionHooksIfEnabled()) { - std::unique_ptr<char[]> out(new char[blockSize]); - size_t outLen; - Status status = - encryptionHooks->unprotectTmpData(reinterpret_cast<const uint8_t*>(_buffer.get()), - blockSize, - reinterpret_cast<uint8_t*>(out.get()), - blockSize, - &outLen, - _dbName); - uassert(28841, - str::stream() << "Failed to unprotect data: " << status.toString(), - status.isOK()); - blockSize = outLen; - _buffer.swap(out); - } - - if (!compressed) { - _bufferReader.reset(new BufReader(_buffer.get(), blockSize)); - return; - } - - dassert(isValidCompressedBuffer(_buffer.get(), blockSize)); - - size_t uncompressedSize; - uassert(17061, - "Failed to get uncompressed size", - getUncompressedSize(_buffer.get(), blockSize, &uncompressedSize)); - - std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]); - uassert(17062, - "Failed to decompress", - decompress(_buffer.get(), blockSize, decompressionBuffer.get())); - - // Hold on to decompressed data and throw out compressed data at block exit. - _buffer.swap(decompressionBuffer); - _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize)); - } - - /** - * Reads data from disk. - */ - void _read(void* out, size_t size) { - invariant(_fileCurrentOffset < _fileEndOffset, - str::stream() << "Current file offset (" << _fileCurrentOffset - << ") greater than end offset (" << _fileEndOffset << ")"); - - _file->read(_fileCurrentOffset, size, out); - _fileCurrentOffset += size; - } - - const Settings _settings; - - std::unique_ptr<char[]> _buffer; - std::unique_ptr<BufReader> _bufferReader; - - File* _file; // File containing the sorted data range. - std::streamoff _fileStartOffset; // File offset at which the sorted data range starts. - std::streamoff _fileCurrentOffset; // File offset at which we are currently reading from. - std::streamoff _fileEndOffset; // File offset at which the sorted data range ends. - - // Checksum value retrieved from SortedFileWriter that was calculated as data was spilled - // to disk. This is not modified, and is only used for comparison against _afterReadChecksum - // when the FileIterator is exhausted to ensure no data corruption. - const uint32_t _originalChecksum; - - // Checksum value that is updated with each read of a data object from disk. We can compare - // this value with _originalChecksum to check for data corruption if and only if the - // FileIterator is exhausted. - uint32_t _afterReadChecksum = 0; - - boost::optional<std::string> _dbName; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/in_mem_iterator.h b/src/mongo/db/sorter/in_mem_iterator.h deleted file mode 100644 index 03f1ed98dd7..00000000000 --- a/src/mongo/db/sorter/in_mem_iterator.h +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/sorted_data_iterator.h" - -namespace mongo::sorter { -/** - * Returns results from sorted in-memory storage. - */ -template <typename Key, typename Value> -class InMemIterator : public SortedDataIterator<Key, Value> { -public: - using Base = SortedDataIterator<Key, Value>; - using Data = typename Base::Data; - - using Base::_returnPolicy; - - InMemIterator(std::vector<Data>& data, typename Base::ReturnPolicy returnPolicy) - : Base(returnPolicy), _it(data.begin()), _end(data.end()) {} - - bool more() const { - return _it != _end; - } - - Data next() { - switch (_returnPolicy) { - case Base::ReturnPolicy::kCopy: - return *_it++; - case Base::ReturnPolicy::kMove: - return std::move(*_it++); - } - MONGO_UNREACHABLE; - } - -private: - typename std::vector<Data>::iterator _it; - typename std::vector<Data>::iterator _end; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/limit_one_sorter.h b/src/mongo/db/sorter/limit_one_sorter.h deleted file mode 100644 index d16c829a59c..00000000000 --- a/src/mongo/db/sorter/limit_one_sorter.h +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/sorter.h" - -#include "mongo/db/sorter/single_elem_iterator.h" -#include "mongo/db/sorter/util.h" -#include "mongo/util/assert_util.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -class LimitOneSorter : public Sorter<Key, Value> { -public: - using Base = Sorter<Key, Value>; - using Data = typename Base::Data; - using Iterator = typename Base::Iterator; - using CompFn = typename Base::CompFn; - - using Base::_comp; - using Base::_done; - using Base::_numSorted; - using Base::_totalDataSizeSorted; - - explicit LimitOneSorter(const CompFn& comp) : Base(comp) {} - - void add(const Key& key, const Value& val) { - invariant(!_done); - - ++_numSorted; - _totalDataSizeSorted += key.memUsageForSorter() + val.memUsageForSorter(); - - Data contender{key, val}; - - if (_haveData) { - dassertCompIsSane(_comp, _best, contender); - if (_comp(_best, contender) <= 0) { - return; - } - } else { - _haveData = true; - } - - _best = {contender.first.getOwned(), contender.second.getOwned()}; - } - - std::unique_ptr<Iterator> done(typename Iterator::ReturnPolicy returnPolicy) { - _done = true; - return _haveData ? std::make_unique<SingleElemIterator<Key, Value>>(_best, returnPolicy) - : std::make_unique<SingleElemIterator<Key, Value>>(returnPolicy); - } - -private: - Data _best; - bool _haveData = false; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/merge_iterator.h b/src/mongo/db/sorter/merge_iterator.h deleted file mode 100644 index d09b15185e0..00000000000 --- a/src/mongo/db/sorter/merge_iterator.h +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/sorted_data_iterator.h" -#include "mongo/db/sorter/util.h" - -namespace mongo::sorter { -/** - * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted - * ranges within the same file. - */ -template <typename Key, typename Value> -class MergeIterator : public SortedDataIterator<Key, Value> { -public: - using Base = SortedDataIterator<Key, Value>; - using Data = typename Base::Data; - using CompFn = std::function<int(const Data&, const Data&)>; - - MergeIterator(const std::vector<std::unique_ptr<Base>>& iters, - unsigned long long limit, - const CompFn& comp) - : _remaining(limit ? limit : std::numeric_limits<unsigned long long>::max()), - _greater([comp](const std::unique_ptr<Stream>& lhs, const std::unique_ptr<Stream>& rhs) { - dassertCompIsSane(comp, lhs->peek(), rhs->peek()); - int result = comp(lhs->peek(), rhs->peek()); - return result ? result > 0 : lhs->num() > rhs->num(); - }) { - for (size_t i = 0; i < iters.size(); ++i) { - if (iters[i]->more()) { - _heap.push_back(std::make_unique<Stream>(iters[i].get(), i)); - } - } - - if (_heap.empty()) { - _remaining = 0; - return; - } - - std::make_heap(_heap.begin(), _heap.end(), _greater); - std::pop_heap(_heap.begin(), _heap.end(), _greater); - _current = std::move(_heap.back()); - _heap.pop_back(); - } - - ~MergeIterator() { - _current.reset(); - _heap.clear(); - } - - bool more() const { - return _remaining > 0 && (_first || !_heap.empty() || _current->more()); - } - - Data next() { - invariant(more()); - - --_remaining; - - if (_first) { - _first = false; - return _current->current(); - } - - if (!_current->advance()) { - invariant(!_heap.empty()); - std::pop_heap(_heap.begin(), _heap.end(), _greater); - _current = std::move(_heap.back()); - _heap.pop_back(); - } else if (!_heap.empty() && _greater(_current, _heap.front())) { - std::pop_heap(_heap.begin(), _heap.end(), _greater); - _current.swap(_heap.back()); - std::push_heap(_heap.begin(), _heap.end(), _greater); - } - - return _current->current(); - } - - -private: - class Stream { - public: - Stream(Base* data, size_t num) : _data(data), _current(data->next()), _num(num) {} - - const Data& peek() const { - return _current; - } - - Data current() { - return std::move(_current); - } - - bool more() const { - return _data->more(); - } - - bool advance() { - if (!_data->more()) { - return false; - } - - _current = std::move(_data->next()); - return true; - } - - size_t num() const { - return _num; - } - - private: - Base* _data; - Data _current; - size_t _num; - }; - - unsigned long long _remaining; - bool _first = true; - std::unique_ptr<Stream> _current; - std::vector<std::unique_ptr<Stream>> _heap; // MinHeap - std::function<bool(const std::unique_ptr<Stream>& lhs, const std::unique_ptr<Stream>& rhs)> - _greater; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/no_limit_sorter.h b/src/mongo/db/sorter/no_limit_sorter.h deleted file mode 100644 index 71ee3070ad3..00000000000 --- a/src/mongo/db/sorter/no_limit_sorter.h +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/filesystem.hpp> - -#include "mongo/db/sorter/spillable_sorter.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -class NoLimitSorter : public SpillableSorter<Key, Value> { -public: - using Base = SpillableSorter<Key, Value>; - using Data = typename Base::Data; - using Iterator = typename Base::Iterator; - using CompFn = typename Base::CompFn; - using Settings = typename Base::Settings; - - using Base::_data; - using Base::_done; - using Base::_file; - using Base::_less; - using Base::_memUsed; - using Base::_numSorted; - using Base::_options; - using Base::_settings; - using Base::_spill; - using Base::_spilled; - using Base::_totalDataSizeSorted; - - NoLimitSorter(StringData name, - const Options& options, - const CompFn& comp, - const Settings& settings) - : Base(name, options, comp, settings) { - invariant(options.limit == 0); - } - - NoLimitSorter(const std::string& fileName, - const std::vector<SorterRange>& ranges, - const Options& options, - const CompFn& comp, - const Settings& settings) - : Base(options, comp, settings, fileName) { - uassert(16815, - str::stream() << "Unexpected empty file: " << _file->path().string(), - ranges.empty() || boost::filesystem::file_size(_file->path()) != 0); - - _spilled.reserve(ranges.size()); - std::transform(ranges.begin(), - ranges.end(), - std::back_inserter(_spilled), - [this](const SorterRange& range) { - return std::make_unique<FileIterator<Key, Value>>(_file.get(), - range.getStartOffset(), - range.getEndOffset(), - range.getChecksum(), - _settings, - _options.dbName); - }); - } - - void add(const Key& key, const Value& value) { - addOwned(key.getOwned(), value.getOwned()); - } - - void addOwned(Key&& key, Value&& value) override { - invariant(!_done); - - ++_numSorted; - - auto memUsage = key.memUsageForSorter() + value.memUsageForSorter(); - _memUsed += memUsage; - _totalDataSizeSorted += memUsage; - - _data.emplace_back(std::move(key), std::move(value)); - - if (_memUsed > _options.maxMemoryUsageBytes) { - _spill(); - } - } - - typename Base::PersistedState persistDataForShutdown() override { - _spill(); - _file->keep(); - - std::vector<SorterRange> ranges; - ranges.reserve(_spilled.size()); - std::transform(_spilled.begin(), - _spilled.end(), - std::back_inserter(ranges), - [](const auto& it) { return it->getRange(); }); - - return {_file->path().filename().string(), std::move(ranges)}; - } - -private: - void _sort() { - std::stable_sort(_data.begin(), _data.end(), _less); - } -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/null_value.h b/src/mongo/db/sorter/null_value.h deleted file mode 100644 index 6f83a902957..00000000000 --- a/src/mongo/db/sorter/null_value.h +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/bson/util/builder.h" -#include "mongo/util/bufreader.h" - -namespace mongo::sorter { -/** - * 0-sized dummy object that satisfies Sorter's Key/Value interface. - */ -class NullValue { -public: - struct SorterDeserializeSettings {}; - - void serializeForSorter(BufBuilder&) const {} - - static NullValue deserializeForSorter(BufReader&, const SorterDeserializeSettings&) { - return {}; - } - - int memUsageForSorter() const { - return 0; - } - - NullValue getOwned() const { - return {}; - } -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/options.h b/src/mongo/db/sorter/options.h deleted file mode 100644 index ba5f261619d..00000000000 --- a/src/mongo/db/sorter/options.h +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/optional.hpp> - -namespace mongo::sorter { -struct Options { - // The number of KV pairs to be returned. 0 indicates no limit. - unsigned long long limit = 0; - - // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate. - size_t maxMemoryUsageBytes = 64 * 1024 * 1024; - - // Whether we are allowed to spill to disk. If this is none and in-memory exceeds - // maxMemoryUsageBytes, we will uassert. - boost::optional<std::string> tempDir; - - // In case the sorter spills encrypted data to disk that must be readable even after process - // restarts, it must encrypt with a persistent key. This key is accessed using the database - // name that the sorted collection lives in. If encryption is enabled and dbName is boost::none, - // a temporary key is used. - boost::optional<std::string> dbName; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/single_elem_iterator.h b/src/mongo/db/sorter/single_elem_iterator.h deleted file mode 100644 index 2764630a78a..00000000000 --- a/src/mongo/db/sorter/single_elem_iterator.h +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/sorted_data_iterator.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -class SingleElemIterator : public SortedDataIterator<Key, Value> { -public: - using Base = SortedDataIterator<Key, Value>; - using Data = typename Base::Data; - - using Base::_returnPolicy; - - SingleElemIterator(typename Base::ReturnPolicy returnPolicy) : Base(returnPolicy) {} - - SingleElemIterator(Data& data, typename Base::ReturnPolicy returnPolicy) - : Base(returnPolicy), _data(&data) {} - - bool more() const { - return _data; - } - - Data next() { - switch (_returnPolicy) { - case Base::ReturnPolicy::kCopy: - return *std::exchange(_data, nullptr); - case Base::ReturnPolicy::kMove: - return std::move(*std::exchange(_data, nullptr)); - } - MONGO_UNREACHABLE; - } - -private: - Data* _data = nullptr; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/sorted_data_iterator.h b/src/mongo/db/sorter/sorted_data_iterator.h deleted file mode 100644 index a63e77ddccf..00000000000 --- a/src/mongo/db/sorter/sorted_data_iterator.h +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <memory> -#include <vector> - -#include "mongo/db/sorter/options.h" -#include "mongo/db/sorter/sorter_gen.h" -#include "mongo/util/assert_util.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -class SortedDataIterator { -public: - using Data = std::pair<Key, Value>; - using Settings = std::pair<typename Key::SorterDeserializeSettings, - typename Value::SorterDeserializeSettings>; - - enum class ReturnPolicy { - kCopy, - kMove, - }; - - SortedDataIterator(ReturnPolicy returnPolicy = ReturnPolicy::kMove) - : _returnPolicy(returnPolicy) {} - - SortedDataIterator(const SortedDataIterator&) = delete; - SortedDataIterator& operator=(const SortedDataIterator&) = delete; - - virtual ~SortedDataIterator() {} - - virtual bool more() const = 0; - - virtual Data next() = 0; - - virtual SorterRange getRange() const { - MONGO_UNREACHABLE; - } - -protected: - ReturnPolicy _returnPolicy; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/sorted_file_writer.h b/src/mongo/db/sorter/sorted_file_writer.h deleted file mode 100644 index ba82676e49b..00000000000 --- a/src/mongo/db/sorter/sorted_file_writer.h +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/file.h" -#include "mongo/db/sorter/file_iterator.h" -#include "mongo/db/sorter/options.h" -#include "mongo/db/sorter/sorted_data_iterator.h" -#include "mongo/db/sorter/util.h" -#include "mongo/s/is_mongos.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -class SortedFileWriter { -public: - using Iterator = SortedDataIterator<Key, Value>; - using Settings = std::pair<typename Key::SorterDeserializeSettings, - typename Value::SorterDeserializeSettings>; - - SortedFileWriter(File* file, - const boost::optional<std::string>& dbName = boost::none, - const Settings& settings = Settings()) - : _settings(settings), - _file(file), - _fileStartOffset(_file->currentOffset()), - _dbName(dbName) { - invariant(!isMongos()); - } - - SortedFileWriter(const SortedFileWriter&) = delete; - SortedFileWriter& operator=(const SortedFileWriter&) = delete; - - void addAlreadySorted(const Key& key, const Value& val) { - // Offset that points to the place in the buffer where a new data object will be stored. - int nextObjPos = _buffer.len(); - - // Add serialized key and value to the buffer. - key.serializeForSorter(_buffer); - val.serializeForSorter(_buffer); - - // Serializing the key and value grows the buffer, but _buffer.buf() still points to the - // beginning. Use _buffer.len() to determine portion of buffer containing new datum. - _checksum = - addDataToChecksum(_buffer.buf() + nextObjPos, _buffer.len() - nextObjPos, _checksum); - - if (_buffer.len() > 64 * 1024) { - _spill(); - } - } - - std::unique_ptr<Iterator> done() { - _spill(); - - return std::make_unique<FileIterator<Key, Value>>( - _file, _fileStartOffset, _file->currentOffset(), _checksum, _settings, _dbName); - } - -private: - void _spill() { - int32_t size = _buffer.len(); - char* outBuffer = _buffer.buf(); - - if (size == 0) { - return; - } - - std::string compressed; - compress(outBuffer, size, &compressed); - invariant(compressed.size() <= size_t(std::numeric_limits<int32_t>::max())); - - const bool shouldCompress = compressed.size() < size_t(_buffer.len() / 10 * 9); - if (shouldCompress) { - size = compressed.size(); - outBuffer = const_cast<char*>(compressed.data()); - } - - std::unique_ptr<char[]> out; - if (auto encryptionHooks = getEncryptionHooksIfEnabled()) { - size_t protectedSizeMax = size + encryptionHooks->additionalBytesForProtectedBuffer(); - out.reset(new char[protectedSizeMax]); - size_t resultLen; - Status status = - encryptionHooks->protectTmpData(reinterpret_cast<const uint8_t*>(outBuffer), - size, - reinterpret_cast<uint8_t*>(out.get()), - protectedSizeMax, - &resultLen, - _dbName); - uassert(28842, - str::stream() << "Failed to compress data: " << status.toString(), - status.isOK()); - outBuffer = out.get(); - size = resultLen; - } - - // Negative size means compressed. - size = shouldCompress ? -size : size; - _file->write(reinterpret_cast<const char*>(&size), sizeof(size)); - _file->write(outBuffer, std::abs(size)); - - _buffer.reset(); - } - - const Settings _settings; - File* _file; - BufBuilder _buffer; - - // Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator - // to ensure data has not been corrupted after reading from disk. - uint32_t _checksum = 0; - - // Tracks where in the file we started writing the sorted data range so that the information can - // be given to the Iterator in done(). - std::streamoff _fileStartOffset; - - boost::optional<std::string> _dbName; -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp new file mode 100644 index 00000000000..e8fece6cff2 --- /dev/null +++ b/src/mongo/db/sorter/sorter.cpp @@ -0,0 +1,1231 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +/** + * This is the implementation for the Sorter. + * + * It is intended to be included in other cpp files like this: + * + * #include <normal/include/files.h> + * + * #include "mongo/db/sorter/sorter.h" + * + * namespace mongo { + * // Your code + * } + * + * #include "mongo/db/sorter/sorter.cpp" + * MONGO_CREATE_SORTER(MyKeyType, MyValueType, MyComparatorType); + * + * Do this once for each unique set of parameters to MONGO_CREATE_SORTER. + */ + +#include "mongo/db/sorter/sorter.h" + +#include <boost/filesystem/operations.hpp> +#include <snappy.h> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/config.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/encryption_hooks.h" +#include "mongo/db/storage/storage_options.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/overflow_arithmetic.h" +#include "mongo/s/is_mongos.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/destructor_guard.h" +#include "mongo/util/str.h" + +namespace mongo { + +namespace { + +/** + * Calculates and returns a new murmur hash value based on the prior murmur hash and a new piece + * of data. + */ +uint32_t addDataToChecksum(const void* startOfData, size_t sizeOfData, uint32_t checksum) { + unsigned newChecksum; + MurmurHash3_x86_32(startOfData, sizeOfData, checksum, &newChecksum); + return newChecksum; +} + +void checkNoExternalSortOnMongos(const SortOptions& opts) { + // This should be checked by consumers, but if it isn't try to fail early. + uassert(16947, + "Attempting to use external sort from mongos. This is not allowed.", + !(isMongos() && opts.extSortAllowed)); +} + +/** + * Returns the current EncryptionHooks registered with the global service context. + * Returns nullptr if the service context is not available; or if the EncyptionHooks + * registered is not enabled. + */ +EncryptionHooks* getEncryptionHooksIfEnabled() { + // Some tests may not run with a global service context. + if (!hasGlobalServiceContext()) { + return nullptr; + } + auto service = getGlobalServiceContext(); + auto encryptionHooks = EncryptionHooks::get(service); + if (!encryptionHooks->enabled()) { + return nullptr; + } + return encryptionHooks; +} + +} // namespace + +namespace sorter { + +// We need to use the "real" errno everywhere, not GetLastError() on Windows +inline std::string myErrnoWithDescription() { + int errnoCopy = errno; + StringBuilder sb; + sb << "errno:" << errnoCopy << ' ' << strerror(errnoCopy); + return sb.str(); +} + +template <typename Data, typename Comparator> +void dassertCompIsSane(const Comparator& comp, const Data& lhs, const Data& rhs) { +#if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER) + // MSVC++ already does similar verification in debug mode in addition to using + // algorithms that do more comparisons. Doing our own verification in addition makes + // debug builds considerably slower without any additional safety. + + // test reversed comparisons + const int regular = comp(lhs, rhs); + if (regular == 0) { + invariant(comp(rhs, lhs) == 0); + } else if (regular < 0) { + invariant(comp(rhs, lhs) > 0); + } else { + invariant(comp(rhs, lhs) < 0); + } + + // test reflexivity + invariant(comp(lhs, lhs) == 0); + invariant(comp(rhs, rhs) == 0); +#endif +} + +/** + * Returns results from sorted in-memory storage. + */ +template <typename Key, typename Value> +class InMemIterator : public SortIteratorInterface<Key, Value> { +public: + typedef std::pair<Key, Value> Data; + + /// No data to iterate + InMemIterator() {} + + /// Only a single value + InMemIterator(const Data& singleValue) : _data(1, singleValue) {} + + /// Any number of values + template <typename Container> + InMemIterator(const Container& input) : _data(input.begin(), input.end()) {} + + InMemIterator(std::deque<Data> data) : _data(std::move(data)) {} + + void openSource() {} + void closeSource() {} + + bool more() { + return !_data.empty(); + } + Data next() { + Data out = std::move(_data.front()); + _data.pop_front(); + return out; + } + +private: + std::deque<Data> _data; +}; + +/** + * Returns results from a sorted range within a file. Each instance is given a file name and start + * and end offsets. + * + * This class is NOT responsible for file clean up / deletion. There are openSource() and + * closeSource() functions to ensure the FileIterator is not holding the file open when the file is + * deleted. Since it is one among many FileIterators, it cannot close a file that may still be in + * use elsewhere. + */ +template <typename Key, typename Value> +class FileIterator : public SortIteratorInterface<Key, Value> { +public: + typedef std::pair<typename Key::SorterDeserializeSettings, + typename Value::SorterDeserializeSettings> + Settings; + typedef std::pair<Key, Value> Data; + + FileIterator(std::shared_ptr<typename Sorter<Key, Value>::File> file, + std::streamoff fileStartOffset, + std::streamoff fileEndOffset, + const Settings& settings, + const boost::optional<std::string>& dbName, + const uint32_t checksum) + : _settings(settings), + _file(std::move(file)), + _fileStartOffset(fileStartOffset), + _fileCurrentOffset(fileStartOffset), + _fileEndOffset(fileEndOffset), + _dbName(dbName), + _originalChecksum(checksum) {} + + void openSource() {} + + void closeSource() { + // If the file iterator reads through all data objects, we can ensure non-corrupt data + // by comparing the newly calculated checksum with the original checksum from the data + // written to disk. Some iterators do not read back all data from the file, which prohibits + // the _afterReadChecksum from obtaining all the information needed. Thus, we only fassert + // if all data that was written to disk is read back and the checksums are not equivalent. + if (_done && _bufferReader->atEof() && (_originalChecksum != _afterReadChecksum)) { + fassert(31182, + Status(ErrorCodes::Error::ChecksumMismatch, + "Data read from disk does not match what was written to disk. Possible " + "corruption of data.")); + } + } + + bool more() { + if (!_done) + _fillBufferIfNeeded(); // may change _done + return !_done; + } + + Data next() { + invariant(!_done); + _fillBufferIfNeeded(); + + const char* startOfNewData = static_cast<const char*>(_bufferReader->pos()); + + // Note: calling read() on the _bufferReader buffer in the deserialize function advances the + // buffer. Since Key comes before Value in the _bufferReader, and C++ makes no function + // parameter evaluation order guarantees, we cannot deserialize Key and Value straight into + // the Data constructor + auto first = Key::deserializeForSorter(*_bufferReader, _settings.first); + auto second = Value::deserializeForSorter(*_bufferReader, _settings.second); + + // The difference of _bufferReader's position before and after reading the data + // will provide the length of the data that was just read. + const char* endOfNewData = static_cast<const char*>(_bufferReader->pos()); + + _afterReadChecksum = + addDataToChecksum(startOfNewData, endOfNewData - startOfNewData, _afterReadChecksum); + + return Data(std::move(first), std::move(second)); + } + + SorterRange getRange() const { + return {_fileStartOffset, _fileEndOffset, _originalChecksum}; + } + +private: + /** + * Attempts to refill the _bufferReader if it is empty. Expects _done to be false. + */ + void _fillBufferIfNeeded() { + invariant(!_done); + + if (!_bufferReader || _bufferReader->atEof()) + _fillBufferFromDisk(); + } + + /** + * Tries to read from disk and places any results in _bufferReader. If there is no more data to + * read, then _done is set to true and the function returns immediately. + */ + void _fillBufferFromDisk() { + int32_t rawSize; + _read(&rawSize, sizeof(rawSize)); + if (_done) + return; + + // negative size means compressed + const bool compressed = rawSize < 0; + int32_t blockSize = std::abs(rawSize); + + _buffer.reset(new char[blockSize]); + _read(_buffer.get(), blockSize); + uassert(16816, "file too short?", !_done); + + if (auto encryptionHooks = getEncryptionHooksIfEnabled()) { + std::unique_ptr<char[]> out(new char[blockSize]); + size_t outLen; + Status status = + encryptionHooks->unprotectTmpData(reinterpret_cast<const uint8_t*>(_buffer.get()), + blockSize, + reinterpret_cast<uint8_t*>(out.get()), + blockSize, + &outLen, + _dbName); + uassert(28841, + str::stream() << "Failed to unprotect data: " << status.toString(), + status.isOK()); + blockSize = outLen; + _buffer.swap(out); + } + + if (!compressed) { + _bufferReader.reset(new BufReader(_buffer.get(), blockSize)); + return; + } + + dassert(snappy::IsValidCompressedBuffer(_buffer.get(), blockSize)); + + size_t uncompressedSize; + uassert(17061, + "couldn't get uncompressed length", + snappy::GetUncompressedLength(_buffer.get(), blockSize, &uncompressedSize)); + + std::unique_ptr<char[]> decompressionBuffer(new char[uncompressedSize]); + uassert(17062, + "decompression failed", + snappy::RawUncompress(_buffer.get(), blockSize, decompressionBuffer.get())); + + // hold on to decompressed data and throw out compressed data at block exit + _buffer.swap(decompressionBuffer); + _bufferReader.reset(new BufReader(_buffer.get(), uncompressedSize)); + } + + /** + * Attempts to read data from disk. Sets _done to true when file offset reaches _fileEndOffset. + */ + void _read(void* out, size_t size) { + if (_fileCurrentOffset == _fileEndOffset) { + _done = true; + return; + } + + invariant(_fileCurrentOffset < _fileEndOffset, + str::stream() << "Current file offset (" << _fileCurrentOffset + << ") greater than end offset (" << _fileEndOffset << ")"); + + _file->read(_fileCurrentOffset, size, out); + _fileCurrentOffset += size; + } + + const Settings _settings; + bool _done = false; + + std::unique_ptr<char[]> _buffer; + std::unique_ptr<BufReader> _bufferReader; + std::shared_ptr<typename Sorter<Key, Value>::File> + _file; // File containing the sorted data range. + std::streamoff _fileStartOffset; // File offset at which the sorted data range starts. + std::streamoff _fileCurrentOffset; // File offset at which we are currently reading from. + std::streamoff _fileEndOffset; // File offset at which the sorted data range ends. + boost::optional<std::string> _dbName; + + // Checksum value that is updated with each read of a data object from disk. We can compare + // this value with _originalChecksum to check for data corruption if and only if the + // FileIterator is exhausted. + uint32_t _afterReadChecksum = 0; + + // Checksum value retrieved from SortedFileWriter that was calculated as data was spilled + // to disk. This is not modified, and is only used for comparison against _afterReadChecksum + // when the FileIterator is exhausted to ensure no data corruption. + const uint32_t _originalChecksum; +}; + +/** + * Merge-sorts results from 0 or more FileIterators, all of which should be iterating over sorted + * ranges within the same file. This class is given the data source file name upon construction and + * is responsible for deleting the data source file upon destruction. + */ +template <typename Key, typename Value, typename Comparator> +class MergeIterator : public SortIteratorInterface<Key, Value> { +public: + typedef SortIteratorInterface<Key, Value> Input; + typedef std::pair<Key, Value> Data; + + MergeIterator(const std::vector<std::shared_ptr<Input>>& iters, + const SortOptions& opts, + const Comparator& comp) + : _opts(opts), + _remaining(opts.limit ? opts.limit : std::numeric_limits<unsigned long long>::max()), + _first(true), + _greater(comp) { + for (size_t i = 0; i < iters.size(); i++) { + iters[i]->openSource(); + if (iters[i]->more()) { + _heap.push_back(std::make_shared<Stream>(i, iters[i]->next(), iters[i])); + } else { + iters[i]->closeSource(); + } + } + + if (_heap.empty()) { + _remaining = 0; + return; + } + + std::make_heap(_heap.begin(), _heap.end(), _greater); + std::pop_heap(_heap.begin(), _heap.end(), _greater); + _current = _heap.back(); + _heap.pop_back(); + } + + ~MergeIterator() { + _current.reset(); + _heap.clear(); + } + + void openSource() {} + void closeSource() {} + + bool more() { + if (_remaining > 0 && (_first || !_heap.empty() || _current->more())) + return true; + + _remaining = 0; + return false; + } + + Data next() { + verify(_remaining); + + _remaining--; + + if (_first) { + _first = false; + return _current->current(); + } + + if (!_current->advance()) { + verify(!_heap.empty()); + std::pop_heap(_heap.begin(), _heap.end(), _greater); + _current = _heap.back(); + _heap.pop_back(); + } else if (!_heap.empty() && _greater(_current, _heap.front())) { + std::pop_heap(_heap.begin(), _heap.end(), _greater); + std::swap(_current, _heap.back()); + std::push_heap(_heap.begin(), _heap.end(), _greater); + } + + return _current->current(); + } + + +private: + /** + * Data iterator over an Input stream. + * + * This class is responsible for closing the Input source upon destruction, unfortunately, + * because that is the path of least resistence to a design change requiring MergeIterator to + * handle eventual deletion of said Input source. + */ + class Stream { + public: + Stream(size_t fileNum, const Data& first, std::shared_ptr<Input> rest) + : fileNum(fileNum), _current(first), _rest(rest) {} + + ~Stream() { + _rest->closeSource(); + } + + const Data& current() const { + return _current; + } + bool more() { + return _rest->more(); + } + bool advance() { + if (!_rest->more()) + return false; + + _current = _rest->next(); + return true; + } + + const size_t fileNum; + + private: + Data _current; + std::shared_ptr<Input> _rest; + }; + + class STLComparator { // uses greater rather than less-than to maintain a MinHeap + public: + explicit STLComparator(const Comparator& comp) : _comp(comp) {} + + template <typename Ptr> + bool operator()(const Ptr& lhs, const Ptr& rhs) const { + // first compare data + dassertCompIsSane(_comp, lhs->current(), rhs->current()); + int ret = _comp(lhs->current(), rhs->current()); + if (ret) + return ret > 0; + + // then compare fileNums to ensure stability + return lhs->fileNum > rhs->fileNum; + } + + private: + const Comparator _comp; + }; + + SortOptions _opts; + unsigned long long _remaining; + bool _first; + std::shared_ptr<Stream> _current; + std::vector<std::shared_ptr<Stream>> _heap; // MinHeap + STLComparator _greater; // named so calls make sense +}; + +template <typename Key, typename Value, typename Comparator> +class NoLimitSorter : public Sorter<Key, Value> { +public: + typedef std::pair<Key, Value> Data; + typedef SortIteratorInterface<Key, Value> Iterator; + typedef std::pair<typename Key::SorterDeserializeSettings, + typename Value::SorterDeserializeSettings> + Settings; + + NoLimitSorter(const SortOptions& opts, + const Comparator& comp, + const Settings& settings = Settings()) + : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) { + invariant(opts.limit == 0); + } + + NoLimitSorter(const std::string& fileName, + const std::vector<SorterRange>& ranges, + const SortOptions& opts, + const Comparator& comp, + const Settings& settings = Settings()) + : Sorter<Key, Value>(opts, fileName), _comp(comp), _settings(settings) { + invariant(opts.extSortAllowed); + + uassert(16815, + str::stream() << "Unexpected empty file: " << this->_file->path().string(), + ranges.empty() || boost::filesystem::file_size(this->_file->path()) != 0); + + this->_iters.reserve(ranges.size()); + std::transform(ranges.begin(), + ranges.end(), + std::back_inserter(this->_iters), + [this](const SorterRange& range) { + return std::make_shared<sorter::FileIterator<Key, Value>>( + this->_file, + range.getStartOffset(), + range.getEndOffset(), + this->_settings, + this->_opts.dbName, + range.getChecksum()); + }); + } + + void add(const Key& key, const Value& val) { + invariant(!_done); + + _data.emplace_back(key.getOwned(), val.getOwned()); + + auto memUsage = key.memUsageForSorter() + val.memUsageForSorter(); + _memUsed += memUsage; + this->_totalDataSizeSorted += memUsage; + + if (_memUsed > this->_opts.maxMemoryUsageBytes) + spill(); + } + + void emplace(Key&& key, Value&& val) override { + invariant(!_done); + + auto memUsage = key.memUsageForSorter() + val.memUsageForSorter(); + _memUsed += memUsage; + this->_totalDataSizeSorted += memUsage; + + _data.emplace_back(std::move(key), std::move(val)); + + if (_memUsed > this->_opts.maxMemoryUsageBytes) + spill(); + } + + Iterator* done() { + invariant(!std::exchange(_done, true)); + + if (this->_iters.empty()) { + sort(); + if (this->_opts.moveSortedDataIntoIterator) { + return new InMemIterator<Key, Value>(std::move(_data)); + } + return new InMemIterator<Key, Value>(_data); + } + + spill(); + return Iterator::merge(this->_iters, this->_opts, _comp); + } + +private: + class STLComparator { + public: + explicit STLComparator(const Comparator& comp) : _comp(comp) {} + bool operator()(const Data& lhs, const Data& rhs) const { + dassertCompIsSane(_comp, lhs, rhs); + return _comp(lhs, rhs) < 0; + } + + private: + const Comparator& _comp; + }; + + void sort() { + STLComparator less(_comp); + std::stable_sort(_data.begin(), _data.end(), less); + this->_numSorted += _data.size(); + } + + void spill() { + if (_data.empty()) + return; + + if (!this->_opts.extSortAllowed) { + // This error message only applies to sorts from user queries made through the find or + // aggregation commands. Other clients, such as bulk index builds, should suppress this + // error, either by allowing external sorting or by catching and throwing a more + // appropriate error. + uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + str::stream() + << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes + << " bytes, but did not opt in to external sorting."); + } + + sort(); + + SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings); + for (; !_data.empty(); _data.pop_front()) { + writer.addAlreadySorted(_data.front().first, _data.front().second); + } + Iterator* iteratorPtr = writer.done(); + + this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); + + _memUsed = 0; + } + + const Comparator _comp; + const Settings _settings; + bool _done = false; + size_t _memUsed = 0; + std::deque<Data> _data; // Data that has not been spilled. +}; + +template <typename Key, typename Value, typename Comparator> +class LimitOneSorter : public Sorter<Key, Value> { + // Since this class is only used for limit==1, it omits all logic to + // spill to disk and only tracks memory usage if explicitly requested. +public: + typedef std::pair<Key, Value> Data; + typedef SortIteratorInterface<Key, Value> Iterator; + + LimitOneSorter(const SortOptions& opts, const Comparator& comp) + : _comp(comp), _haveData(false) { + verify(opts.limit == 1); + } + + void add(const Key& key, const Value& val) { + Data contender(key, val); + + this->_numSorted += 1; + if (_haveData) { + dassertCompIsSane(_comp, _best, contender); + if (_comp(_best, contender) <= 0) + return; // not good enough + } else { + _haveData = true; + } + + _best = {contender.first.getOwned(), contender.second.getOwned()}; + } + + Iterator* done() { + if (_haveData) { + if (this->_opts.moveSortedDataIntoIterator) { + return new InMemIterator<Key, Value>(std::move(_best)); + } + return new InMemIterator<Key, Value>(_best); + } else { + return new InMemIterator<Key, Value>(); + } + } + +private: + void spill() { + invariant(false, "LimitOneSorter does not spill to disk"); + } + + const Comparator _comp; + Data _best; + bool _haveData; // false at start, set to true on first call to add() +}; + +template <typename Key, typename Value, typename Comparator> +class TopKSorter : public Sorter<Key, Value> { +public: + typedef std::pair<Key, Value> Data; + typedef SortIteratorInterface<Key, Value> Iterator; + typedef std::pair<typename Key::SorterDeserializeSettings, + typename Value::SorterDeserializeSettings> + Settings; + + TopKSorter(const SortOptions& opts, + const Comparator& comp, + const Settings& settings = Settings()) + : Sorter<Key, Value>(opts), + _comp(comp), + _settings(settings), + _memUsed(0), + _haveCutoff(false), + _worstCount(0), + _medianCount(0) { + // This also *works* with limit==1 but LimitOneSorter should be used instead + invariant(opts.limit > 1); + + // Preallocate a fixed sized vector of the required size if we don't expect it to have a + // major impact on our memory budget. This is the common case with small limits. + if (opts.limit < + std::min((opts.maxMemoryUsageBytes / 10) / sizeof(typename decltype(_data)::value_type), + _data.max_size())) { + _data.reserve(opts.limit); + } + } + + void add(const Key& key, const Value& val) { + invariant(!_done); + + this->_numSorted += 1; + + STLComparator less(_comp); + Data contender(key, val); + + if (_data.size() < this->_opts.limit) { + if (_haveCutoff && !less(contender, _cutoff)) + return; + + _data.emplace_back(contender.first.getOwned(), contender.second.getOwned()); + + auto memUsage = key.memUsageForSorter() + val.memUsageForSorter(); + _memUsed += memUsage; + this->_totalDataSizeSorted += memUsage; + + if (_data.size() == this->_opts.limit) + std::make_heap(_data.begin(), _data.end(), less); + + if (_memUsed > this->_opts.maxMemoryUsageBytes) + spill(); + + return; + } + + invariant(_data.size() == this->_opts.limit); + + if (!less(contender, _data.front())) + return; // not good enough + + // Remove the old worst pair and insert the contender, adjusting _memUsed + + auto memUsage = key.memUsageForSorter() + val.memUsageForSorter(); + _memUsed += memUsage; + this->_totalDataSizeSorted += memUsage; + + _memUsed -= _data.front().first.memUsageForSorter(); + _memUsed -= _data.front().second.memUsageForSorter(); + + std::pop_heap(_data.begin(), _data.end(), less); + _data.back() = {contender.first.getOwned(), contender.second.getOwned()}; + std::push_heap(_data.begin(), _data.end(), less); + + if (_memUsed > this->_opts.maxMemoryUsageBytes) + spill(); + } + + Iterator* done() { + if (this->_iters.empty()) { + sort(); + if (this->_opts.moveSortedDataIntoIterator) { + return new InMemIterator<Key, Value>(std::move(_data)); + } + return new InMemIterator<Key, Value>(_data); + } + + spill(); + Iterator* iterator = Iterator::merge(this->_iters, this->_opts, _comp); + _done = true; + return iterator; + } + +private: + class STLComparator { + public: + explicit STLComparator(const Comparator& comp) : _comp(comp) {} + bool operator()(const Data& lhs, const Data& rhs) const { + dassertCompIsSane(_comp, lhs, rhs); + return _comp(lhs, rhs) < 0; + } + + private: + const Comparator& _comp; + }; + + void sort() { + STLComparator less(_comp); + + if (_data.size() == this->_opts.limit) { + std::sort_heap(_data.begin(), _data.end(), less); + } else { + std::stable_sort(_data.begin(), _data.end(), less); + } + } + + // Can only be called after _data is sorted + void updateCutoff() { + // Theory of operation: We want to be able to eagerly ignore values we know will not + // be in the TopK result set by setting _cutoff to a value we know we have at least + // K values equal to or better than. There are two values that we track to + // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When + // one of these values becomes the new _cutoff, its associated counter is reset to 0 + // and a new value is chosen for that member the next time we spill. + // + // _worstSeen is the worst value we've seen so that all kept values are better than + // (or equal to) it. This means that once _worstCount >= _opts.limit there is no + // reason to consider values worse than _worstSeen so it can become the new _cutoff. + // This technique is especially useful when the input is already roughly sorted (eg + // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff + // that will exclude most later values, making the full TopK operation including + // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time. + // + // _lastMedian was the median of the _data in the first spill() either overall or + // following a promotion of _lastMedian to _cutoff. We count the number of kept + // values that are better than or equal to _lastMedian in _medianCount and can + // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming + // reasonable median selection (which should happen when the data is completely + // unsorted), after the first K spilled values, we will keep roughly 50% of the + // incoming values, 25% after the second K, 12.5% after the third K, etc. This means + // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values, + // so the expected number of kept values is O(Log(N/K) * K). The final run time if + // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) + + // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)). + // + // This leaves a currently unoptimized worst case of data that is already roughly + // sorted, but in the wrong direction, such that the desired results are all the + // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this + // should be trivially detectable, as a future optimization it might be nice to + // detect this case and reverse the direction of input (if possible) which would + // turn this into the best case described above. + // + // Pedantic notes: The time complexities above (which count number of comparisons) + // ignore the sorting of batches prior to spilling to disk since they make it more + // confusing without changing the results. If you want to add them back in, add an + // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also, + // all space complexities measure disk space rather than memory since this class is + // O(1) in memory due to the _opts.maxMemoryUsageBytes limit. + + STLComparator less(_comp); // less is "better" for TopK. + + // Pick a new _worstSeen or _lastMedian if should. + if (_worstCount == 0 || less(_worstSeen, _data.back())) { + _worstSeen = _data.back(); + } + if (_medianCount == 0) { + size_t medianIndex = _data.size() / 2; // chooses the higher if size() is even. + _lastMedian = _data[medianIndex]; + } + + // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian. + _worstCount += _data.size(); // everything is better or equal + typename std::vector<Data>::iterator firstWorseThanLastMedian = + std::upper_bound(_data.begin(), _data.end(), _lastMedian, less); + _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian); + + + // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should. + if (_worstCount >= this->_opts.limit) { + if (!_haveCutoff || less(_worstSeen, _cutoff)) { + _cutoff = _worstSeen; + _haveCutoff = true; + } + _worstCount = 0; + } + if (_medianCount >= this->_opts.limit) { + if (!_haveCutoff || less(_lastMedian, _cutoff)) { + _cutoff = _lastMedian; + _haveCutoff = true; + } + _medianCount = 0; + } + } + + void spill() { + invariant(!_done); + + if (_data.empty()) + return; + + if (!this->_opts.extSortAllowed) { + // This error message only applies to sorts from user queries made through the find or + // aggregation commands. Other clients should suppress this error, either by allowing + // external sorting or by catching and throwing a more appropriate error. + uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + str::stream() + << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes + << " bytes, but did not opt in to external sorting. Aborting operation." + << " Pass allowDiskUse:true to opt in."); + } + + // We should check readOnly before getting here. + invariant(!storageGlobalParams.readOnly); + + sort(); + updateCutoff(); + + SortedFileWriter<Key, Value> writer(this->_opts, this->_file, _settings); + for (size_t i = 0; i < _data.size(); i++) { + writer.addAlreadySorted(_data[i].first, _data[i].second); + } + + // clear _data and release backing array's memory + std::vector<Data>().swap(_data); + + Iterator* iteratorPtr = writer.done(); + this->_iters.push_back(std::shared_ptr<Iterator>(iteratorPtr)); + + _memUsed = 0; + } + + const Comparator _comp; + const Settings _settings; + bool _done = false; + size_t _memUsed; + + // Data that has not been spilled. Organized as max-heap if size == limit. + std::vector<Data> _data; + + // See updateCutoff() for a full description of how these members are used. + bool _haveCutoff; + Data _cutoff; // We can definitely ignore values worse than this. + Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit. + size_t _worstCount; // Number of docs better or equal to _worstSeen kept so far. + Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit. + size_t _medianCount; // Number of docs better or equal to _lastMedian kept so far. +}; + +} // namespace sorter + +template <typename Key, typename Value> +Sorter<Key, Value>::Sorter(const SortOptions& opts) + : _opts(opts), + _file(opts.extSortAllowed + ? std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + nextFileName()) + : nullptr) {} + +template <typename Key, typename Value> +Sorter<Key, Value>::Sorter(const SortOptions& opts, const std::string& fileName) + : _opts(opts), + _file(std::make_shared<Sorter<Key, Value>::File>(opts.tempDir + "/" + fileName)) { + invariant(opts.extSortAllowed); + invariant(!opts.tempDir.empty()); + invariant(!fileName.empty()); +} + +template <typename Key, typename Value> +typename Sorter<Key, Value>::PersistedState Sorter<Key, Value>::persistDataForShutdown() { + spill(); + this->_file->keep(); + + std::vector<SorterRange> ranges; + ranges.reserve(_iters.size()); + std::transform(_iters.begin(), _iters.end(), std::back_inserter(ranges), [](const auto it) { + return it->getRange(); + }); + + return {_file->path().filename().string(), ranges}; +} + +template <typename Key, typename Value> +Sorter<Key, Value>::File::~File() { + if (_keep) { + return; + } + + if (_file.is_open()) { + DESTRUCTOR_GUARD(_file.exceptions(std::ios::failbit)); + DESTRUCTOR_GUARD(_file.close()); + } + + DESTRUCTOR_GUARD(boost::filesystem::remove(_path)); +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::read(std::streamoff offset, std::streamsize size, void* out) { + if (!_file.is_open()) { + _open(); + } + + if (_offset != -1) { + _file.exceptions(std::ios::goodbit); + _file.flush(); + _offset = -1; + + uassert(5479100, + str::stream() << "Error flushing file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file); + } + + _file.seekg(offset); + _file.read(reinterpret_cast<char*>(out), size); + + uassert(16817, + str::stream() << "Error reading file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file); + + invariant(_file.gcount() == size, + str::stream() << "Number of bytes read (" << _file.gcount() + << ") not equal to expected number (" << size << ")"); + + uassert(51049, + str::stream() << "Error reading file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file.tellg() >= 0); +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::write(const char* data, std::streamsize size) { + _ensureOpenForWriting(); + + try { + _file.write(data, size); + _offset += size; + } catch (const std::system_error& ex) { + if (ex.code() == std::errc::no_space_on_device) { + uasserted(ErrorCodes::OutOfDiskSpace, + str::stream() << ex.what() << ": " << _path.string()); + } + uasserted(5642403, + str::stream() << "Error writing to file " << _path.string() << ": " + << sorter::myErrnoWithDescription()); + } catch (const std::exception&) { + uasserted(16821, + str::stream() << "Error writing to file " << _path.string() << ": " + << sorter::myErrnoWithDescription()); + } +} + +template <typename Key, typename Value> +std::streamoff Sorter<Key, Value>::File::currentOffset() { + _ensureOpenForWriting(); + return _offset; +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::_open() { + invariant(!_file.is_open()); + + boost::filesystem::create_directories(_path.parent_path()); + + // We open the provided file in append mode so that SortedFileWriter instances can share + // the same file, used serially. We want to share files in order to stay below system + // open file limits. + _file.open(_path.string(), std::ios::app | std::ios::binary | std::ios::in | std::ios::out); + + uassert(16818, + str::stream() << "Error opening file " << _path.string() << ": " + << sorter::myErrnoWithDescription(), + _file.good()); +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::File::_ensureOpenForWriting() { + invariant(_offset != -1 || !_file.is_open()); + + if (_file.is_open()) { + return; + } + + _open(); + _file.exceptions(std::ios::failbit | std::ios::badbit); + _offset = boost::filesystem::file_size(_path); +} + +// +// SortedFileWriter +// + +template <typename Key, typename Value> +SortedFileWriter<Key, Value>::SortedFileWriter( + const SortOptions& opts, + std::shared_ptr<typename Sorter<Key, Value>::File> file, + const Settings& settings) + : _settings(settings), + _file(std::move(file)), + _fileStartOffset(_file->currentOffset()), + _dbName(opts.dbName) { + // This should be checked by consumers, but if we get here don't allow writes. + uassert( + 16946, "Attempting to use external sort from mongos. This is not allowed.", !isMongos()); + + uassert(17148, + "Attempting to use external sort without setting SortOptions::tempDir", + !opts.tempDir.empty()); +} + +template <typename Key, typename Value> +void SortedFileWriter<Key, Value>::addAlreadySorted(const Key& key, const Value& val) { + + // Offset that points to the place in the buffer where a new data object will be stored. + int _nextObjPos = _buffer.len(); + + // Add serialized key and value to the buffer. + key.serializeForSorter(_buffer); + val.serializeForSorter(_buffer); + + // Serializing the key and value grows the buffer, but _buffer.buf() still points to the + // beginning. Use _buffer.len() to determine portion of buffer containing new datum. + _checksum = + addDataToChecksum(_buffer.buf() + _nextObjPos, _buffer.len() - _nextObjPos, _checksum); + + if (_buffer.len() > 64 * 1024) + spill(); +} + +template <typename Key, typename Value> +void SortedFileWriter<Key, Value>::spill() { + int32_t size = _buffer.len(); + char* outBuffer = _buffer.buf(); + + if (size == 0) + return; + + std::string compressed; + snappy::Compress(outBuffer, size, &compressed); + verify(compressed.size() <= size_t(std::numeric_limits<int32_t>::max())); + + const bool shouldCompress = compressed.size() < size_t(_buffer.len() / 10 * 9); + if (shouldCompress) { + size = compressed.size(); + outBuffer = const_cast<char*>(compressed.data()); + } + + std::unique_ptr<char[]> out; + if (auto encryptionHooks = getEncryptionHooksIfEnabled()) { + size_t protectedSizeMax = size + encryptionHooks->additionalBytesForProtectedBuffer(); + out.reset(new char[protectedSizeMax]); + size_t resultLen; + Status status = encryptionHooks->protectTmpData(reinterpret_cast<const uint8_t*>(outBuffer), + size, + reinterpret_cast<uint8_t*>(out.get()), + protectedSizeMax, + &resultLen, + _dbName); + uassert(28842, + str::stream() << "Failed to compress data: " << status.toString(), + status.isOK()); + outBuffer = out.get(); + size = resultLen; + } + + // Negative size means compressed. + size = shouldCompress ? -size : size; + _file->write(reinterpret_cast<const char*>(&size), sizeof(size)); + _file->write(outBuffer, std::abs(size)); + + _buffer.reset(); +} + +template <typename Key, typename Value> +SortIteratorInterface<Key, Value>* SortedFileWriter<Key, Value>::done() { + spill(); + + return new sorter::FileIterator<Key, Value>( + _file, _fileStartOffset, _file->currentOffset(), _settings, _dbName, _checksum); +} + +// +// Factory Functions +// + +template <typename Key, typename Value> +template <typename Comparator> +SortIteratorInterface<Key, Value>* SortIteratorInterface<Key, Value>::merge( + const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, + const SortOptions& opts, + const Comparator& comp) { + return new sorter::MergeIterator<Key, Value, Comparator>(iters, opts, comp); +} + +template <typename Key, typename Value> +template <typename Comparator> +Sorter<Key, Value>* Sorter<Key, Value>::make(const SortOptions& opts, + const Comparator& comp, + const Settings& settings) { + checkNoExternalSortOnMongos(opts); + + uassert(17149, + "Attempting to use external sort without setting SortOptions::tempDir", + !(opts.extSortAllowed && opts.tempDir.empty())); + switch (opts.limit) { + case 0: + return new sorter::NoLimitSorter<Key, Value, Comparator>(opts, comp, settings); + case 1: + return new sorter::LimitOneSorter<Key, Value, Comparator>(opts, comp); + default: + return new sorter::TopKSorter<Key, Value, Comparator>(opts, comp, settings); + } +} + +template <typename Key, typename Value> +template <typename Comparator> +Sorter<Key, Value>* Sorter<Key, Value>::makeFromExistingRanges( + const std::string& fileName, + const std::vector<SorterRange>& ranges, + const SortOptions& opts, + const Comparator& comp, + const Settings& settings) { + checkNoExternalSortOnMongos(opts); + + invariant(opts.limit == 0, + str::stream() << "Creating a Sorter from existing ranges is only available with the " + "NoLimitSorter (limit 0), but got limit " + << opts.limit); + + return new sorter::NoLimitSorter<Key, Value, Comparator>( + fileName, ranges, opts, comp, settings); +} +} // namespace mongo diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index f714e62b7fe..357643b09b8 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -29,15 +29,25 @@ #pragma once -#include "mongo/db/sorter/sorted_data_iterator.h" +#include <third_party/murmurhash3/MurmurHash3.h> + +#include <boost/filesystem/path.hpp> +#include <deque> +#include <fstream> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "mongo/bson/util/builder.h" #include "mongo/db/sorter/sorter_gen.h" +#include "mongo/util/bufreader.h" -namespace mongo::sorter { /** - * The in-memory and external Sorter. + * This is the public API for the Sorter (both in-memory and external) * - * The Sorter is templated on Key and Value types, each of which require the following public - * members: + * Many of the classes in this file are templated on Key and Value types which + * require the following public members: * * // A type carrying extra information used by the deserializer. Contents are * // up to you, but it should be cheap to copy. Use an empty struct if your @@ -58,62 +68,276 @@ namespace mongo::sorter { * // Return *this if your type doesn't have an unowned state. * Type getOwned() const; * - * CompFn is a function that compares std::pair<Key, Value> and returns an int less than, equal to, - * or greater than 0 depending on how the two pairs compare with the same semantics as memcmp. + * Comparators are functors that that compare std::pair<Key, Value> and return an + * int less than, equal to, or greater than 0 depending on how the two pairs + * compare with the same semantics as memcmp. + * Example for Key=BSONObj, Value=int: + * + * class MyComparator { + * public: + * int operator()(const std::pair<BSONObj, int>& lhs, + * const std::pair<BSONObj, int>& rhs) { + * int ret = lhs.first.woCompare(rhs.first, _ord); + * if (ret) + * return ret; + * + * if (lhs.second > rhs.second) return 1; + * if (lhs.second == rhs.second) return 0; + * return -1; + * } + * Ordering _ord; + * }; + */ + +namespace mongo { + +/** + * Runtime options that control the Sorter's behavior + */ +struct SortOptions { + // The number of KV pairs to be returned. 0 indicates no limit. + unsigned long long limit; + + // When in-memory memory usage exceeds this value, we try to spill to disk. This is approximate. + size_t maxMemoryUsageBytes; + + // Whether we are allowed to spill to disk. If this is false and in-memory exceeds + // maxMemoryUsageBytes, we will uassert. + bool extSortAllowed; + + // In case the sorter spills encrypted data to disk that must be readable even after process + // restarts, it must encrypt with a persistent key. This key is accessed using the database + // name that the sorted collection lives in. If encryption is enabled and dbName is boost::none, + // a temporary key is used. + boost::optional<std::string> dbName; + + // Directory into which we place a file when spilling to disk. Must be explicitly set if + // extSortAllowed is true. + std::string tempDir; + + // If set to true and sorted data fits into memory, sorted data will be moved into iterator + // instead of copying. + bool moveSortedDataIntoIterator; + + SortOptions() + : limit(0), + maxMemoryUsageBytes(64 * 1024 * 1024), + extSortAllowed(false), + moveSortedDataIntoIterator(false) {} + + // Fluent API to support expressions like SortOptions().Limit(1000).ExtSortAllowed(true) + + SortOptions& Limit(unsigned long long newLimit) { + limit = newLimit; + return *this; + } + + SortOptions& MaxMemoryUsageBytes(size_t newMaxMemoryUsageBytes) { + maxMemoryUsageBytes = newMaxMemoryUsageBytes; + return *this; + } + + SortOptions& ExtSortAllowed(bool newExtSortAllowed = true) { + extSortAllowed = newExtSortAllowed; + return *this; + } + + SortOptions& TempDir(const std::string& newTempDir) { + tempDir = newTempDir; + return *this; + } + + SortOptions& DBName(std::string newDbName) { + dbName = std::move(newDbName); + return *this; + } + + SortOptions& MoveSortedDataIntoIterator(bool newMoveSortedDataIntoIterator = true) { + moveSortedDataIntoIterator = newMoveSortedDataIntoIterator; + return *this; + } +}; + +/** + * This is a 0-sized dummy object that satisfies Sorter's Key/Value interface. + */ +class NullValue { +public: + struct SorterDeserializeSettings {}; // unused + void serializeForSorter(BufBuilder& buf) const { + return; + } + static NullValue deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) { + return {}; + } + int memUsageForSorter() const { + return 0; + } + NullValue getOwned() const { + return {}; + } +}; + +/** + * This is the sorted output iterator from the sorting framework. + */ +template <typename Key, typename Value> +class SortIteratorInterface { + SortIteratorInterface(const SortIteratorInterface&) = delete; + SortIteratorInterface& operator=(const SortIteratorInterface&) = delete; + +public: + typedef std::pair<Key, Value> Data; + typedef std::pair<typename Key::SorterDeserializeSettings, + typename Value::SorterDeserializeSettings> + Settings; + + // Unowned objects are only valid until next call to any method + + virtual bool more() = 0; + virtual std::pair<Key, Value> next() = 0; + + virtual ~SortIteratorInterface() {} + + // Returns an iterator that merges the passed in iterators + template <typename Comparator> + static SortIteratorInterface* merge( + const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, + const SortOptions& opts, + const Comparator& comp); + + // Opens and closes the source of data over which this class iterates, if applicable. + virtual void openSource() = 0; + virtual void closeSource() = 0; + + virtual SorterRange getRange() const { + invariant(false, "Only FileIterator has ranges"); + MONGO_UNREACHABLE; + } + +protected: + SortIteratorInterface() {} // can only be constructed as a base +}; + +/** + * This is the way to input data to the sorting framework. + * + * Each instance of this class will generate a file name and spill sorted data ranges to that file + * if allowed in its given Settings. If the instance destructs before done() is called, it will + * handle deleting the data file used for spills. Otherwise, if done() is called, responsibility for + * file deletion moves to the returned Iterator object, which must then delete the file upon its own + * destruction. + * + * All users of Sorter implementations must define their own nextFileName() function to generate + * unique file names for spills to disk. This is necessary because the sorter.cpp file is separately + * directly included in multiple places, rather than compiled in one place and linked, and so cannot + * itself provide a globally unique ID for file names. See existing function implementations of + * nextFileName() for example. */ template <typename Key, typename Value> class Sorter { + Sorter(const Sorter&) = delete; + Sorter& operator=(const Sorter&) = delete; + public: - using Data = std::pair<Key, Value>; - using Iterator = SortedDataIterator<Key, Value>; - using CompFn = std::function<int(const Data&, const Data&)>; - using Settings = std::pair<typename Key::SorterDeserializeSettings, - typename Value::SorterDeserializeSettings>; + typedef std::pair<Key, Value> Data; + typedef SortIteratorInterface<Key, Value> Iterator; + typedef std::pair<typename Key::SorterDeserializeSettings, + typename Value::SorterDeserializeSettings> + Settings; struct PersistedState { std::string fileName; std::vector<SorterRange> ranges; }; - explicit Sorter(const CompFn& comp) : _comp(comp) {} + /** + * Represents the file that a Sorter uses to spill to disk. Supports reading after writing (or + * reading without any writing), but does not support writing after any reading has been done. + */ + class File { + public: + File(std::string path) : _path(std::move(path)) { + invariant(!_path.empty()); + } - Sorter(const Sorter&) = delete; - Sorter& operator=(const Sorter&) = delete; + ~File(); - virtual ~Sorter() {} + const boost::filesystem::path& path() const { + return _path; + } - /** - * Adds the key/value. The Sorter may make its own owned copy of the data. - */ - virtual void add(const Key& key, const Value& value) = 0; + /** + * Signals that the on-disk file should not be cleaned up. + */ + void keep() { + _keep = true; + }; + + /** + * Reads the requested data from the file. Cannot write more to the file once this has been + * called. + */ + void read(std::streamoff offset, std::streamsize size, void* out); + + /** + * Writes the given data to the end of the file. Cannot be called after reading. + */ + void write(const char* data, std::streamsize size); + + /** + * Returns the current offset of the end of the file. Cannot be called after reading. + */ + std::streamoff currentOffset(); + + private: + void _open(); + + void _ensureOpenForWriting(); + + boost::filesystem::path _path; + std::fstream _file; + + // The current offset of the end of the file, or -1 if the file either has not yet been + // opened or is already being read. + std::streamoff _offset = -1; + + // Whether to keep the on-disk file even after this in-memory object has been destructed. + bool _keep = false; + }; + + explicit Sorter(const SortOptions& opts); /** - * Adds the key/value which aleady own their underlying data. The Sorter may (or may not) take - * advantage of this by not making a new copy of the data. + * ExtSort-only constructor. fileName is the base name of a file in the temp directory. */ - virtual void addOwned(Key&& key, Value&& value) { - add(key, value); - } + Sorter(const SortOptions& opts, const std::string& fileName); + + template <typename Comparator> + static Sorter* make(const SortOptions& opts, + const Comparator& comp, + const Settings& settings = Settings()); + template <typename Comparator> + static Sorter* makeFromExistingRanges(const std::string& fileName, + const std::vector<SorterRange>& ranges, + const SortOptions& opts, + const Comparator& comp, + const Settings& settings = Settings()); + + virtual void add(const Key&, const Value&) = 0; + virtual void emplace(Key&& k, Value&& v) { + add(k, v); + } /** - * Returns an Iterator to iterate through the sorted data. The Sorter must outlive the returned - * Iterator. - * - * If the Sorter does not spill to disk, the specified return policy determines whether the - * Iterator returns data from the Sorter via move or via copy. Returning data via copy is useful - * if you may later need the data to still be intact in order to be spilled. - * - * If the sorter does spill to disk, the specified return policy is ignored. + * Cannot add more data after calling done(). */ - virtual std::unique_ptr<Iterator> done( - typename Iterator::ReturnPolicy returnPolicy = Iterator::ReturnPolicy::kMove) = 0; + virtual Iterator* done() = 0; - virtual PersistedState persistDataForShutdown() { - MONGO_UNREACHABLE; - } + virtual ~Sorter() {} - virtual size_t numSpills() const { - return 0; + size_t numSpills() const { + return _iters.size(); } size_t numSorted() const { @@ -124,12 +348,98 @@ public: return _totalDataSizeSorted; } + PersistedState persistDataForShutdown(); + protected: - const CompFn _comp; + Sorter() {} // can only be constructed as a base + + virtual void spill() = 0; + + size_t _numSorted = 0; // Keeps track of the number of keys sorted. + uint64_t _totalDataSizeSorted = 0; // Keeps track of the total size of data sorted. + + SortOptions _opts; + + std::shared_ptr<File> _file; + + std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled. +}; + +/** + * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file + * range. + */ +template <typename Key, typename Value> +class SortedFileWriter { + SortedFileWriter(const SortedFileWriter&) = delete; + SortedFileWriter& operator=(const SortedFileWriter&) = delete; - size_t _numSorted = 0; - uint64_t _totalDataSizeSorted = 0; +public: + typedef SortIteratorInterface<Key, Value> Iterator; + typedef std::pair<typename Key::SorterDeserializeSettings, + typename Value::SorterDeserializeSettings> + Settings; + + explicit SortedFileWriter(const SortOptions& opts, + std::shared_ptr<typename Sorter<Key, Value>::File> file, + const Settings& settings = Settings()); + + void addAlreadySorted(const Key&, const Value&); + + /** + * Spills any data remaining in the buffer to disk and then closes the file to which data was + * written. + * + * No more data can be added via addAlreadySorted() after calling done(). + */ + Iterator* done(); - bool _done = false; +private: + void spill(); + + const Settings _settings; + std::shared_ptr<typename Sorter<Key, Value>::File> _file; + BufBuilder _buffer; + + // Keeps track of the hash of all data objects spilled to disk. Passed to the FileIterator + // to ensure data has not been corrupted after reading from disk. + uint32_t _checksum = 0; + + // Tracks where in the file we started writing the sorted data range so that the information can + // be given to the Iterator in done(). + std::streamoff _fileStartOffset; + + boost::optional<std::string> _dbName; }; -} // namespace mongo::sorter +} // namespace mongo + +/** + * #include "mongo/db/sorter/sorter.cpp" and call this in a single translation + * unit once for each unique set of template parameters. + */ +#define MONGO_CREATE_SORTER(Key, Value, Comparator) \ + /* public classes */ \ + template class ::mongo::Sorter<Key, Value>; \ + template class ::mongo::SortIteratorInterface<Key, Value>; \ + template class ::mongo::SortedFileWriter<Key, Value>; \ + /* internal classes */ \ + template class ::mongo::sorter::NoLimitSorter<Key, Value, Comparator>; \ + template class ::mongo::sorter::LimitOneSorter<Key, Value, Comparator>; \ + template class ::mongo::sorter::TopKSorter<Key, Value, Comparator>; \ + template class ::mongo::sorter::MergeIterator<Key, Value, Comparator>; \ + template class ::mongo::sorter::InMemIterator<Key, Value>; \ + template class ::mongo::sorter::FileIterator<Key, Value>; \ + /* factory functions */ \ + template ::mongo::SortIteratorInterface<Key, Value>* ::mongo:: \ + SortIteratorInterface<Key, Value>::merge<Comparator>( \ + const std::vector<std::shared_ptr<SortIteratorInterface>>& iters, \ + const SortOptions& opts, \ + const Comparator& comp); \ + template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::make<Comparator>( \ + const SortOptions& opts, const Comparator& comp, const Settings& settings); \ + template ::mongo::Sorter<Key, Value>* ::mongo::Sorter<Key, Value>::makeFromExistingRanges< \ + Comparator>(const std::string& fileName, \ + const std::vector<SorterRange>& ranges, \ + const SortOptions& opts, \ + const Comparator& comp, \ + const Settings& settings); diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index f9b887515a1..4607fef685c 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -29,6 +29,8 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest +#include "mongo/platform/basic.h" + #include <boost/filesystem.hpp> #include <fstream> #include <memory> @@ -36,10 +38,6 @@ #include "mongo/base/data_type_endian.h" #include "mongo/base/static_assert.h" #include "mongo/config.h" -#include "mongo/db/sorter/factory.h" -#include "mongo/db/sorter/in_mem_iterator.h" -#include "mongo/db/sorter/single_elem_iterator.h" -#include "mongo/db/sorter/sorted_file_writer.h" #include "mongo/db/sorter/sorter.h" #include "mongo/logv2/log.h" #include "mongo/platform/random.h" @@ -48,9 +46,36 @@ #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" -namespace mongo::sorter { + +namespace mongo { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicWord<unsigned> sorterTestFileCounter; + return "extsort-sorter-test." + std::to_string(sorterTestFileCounter.fetchAndAdd(1)); +} + +} // namespace mongo + +// Need access to internal classes +#include "mongo/db/sorter/sorter.cpp" + +namespace mongo { +namespace sorter { namespace { +// +// Sorter framework testing utilities +// + class IntWrapper { public: IntWrapper(int i = 0) : _i(i) {} @@ -78,7 +103,7 @@ private: }; typedef std::pair<IntWrapper, IntWrapper> IWPair; -typedef SortedDataIterator<IntWrapper, IntWrapper> IWIterator; +typedef SortIteratorInterface<IntWrapper, IntWrapper> IWIterator; typedef Sorter<IntWrapper, IntWrapper> IWSorter; enum Direction { ASC = 1, DESC = -1 }; @@ -103,7 +128,7 @@ public: : _current(start), _increment(increment), _stop(stop) {} void openSource() {} void closeSource() {} - bool more() const { + bool more() { if (_increment == 0) return true; if (_increment > 0) @@ -126,7 +151,7 @@ class EmptyIterator : public IWIterator { public: void openSource() {} void closeSource() {} - bool more() const { + bool more() { return false; } Data next() { @@ -136,15 +161,15 @@ public: class LimitIterator : public IWIterator { public: - LimitIterator(long long limit, std::unique_ptr<IWIterator> source) - : _remaining(limit), _source(std::move(source)) { + LimitIterator(long long limit, std::shared_ptr<IWIterator> source) + : _remaining(limit), _source(source) { verify(limit > 0); } void openSource() {} void closeSource() {} - bool more() const { + bool more() { return _remaining && _source->more(); } Data next() { @@ -155,13 +180,15 @@ public: private: long long _remaining; - std::unique_ptr<IWIterator> _source; + std::shared_ptr<IWIterator> _source; }; template <typename It1, typename It2> void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) { int iteration; try { + it1->openSource(); + it2->openSource(); for (iteration = 0; true; iteration++) { ASSERT_EQUALS(it1->more(), it2->more()); ASSERT_EQUALS(it1->more(), it2->more()); // make sure more() is safe to call twice @@ -173,68 +200,67 @@ void _assertIteratorsEquivalent(It1 it1, It2 it2, int line) { ASSERT_EQUALS(pair1.first, pair2.first); ASSERT_EQUALS(pair1.second, pair2.second); } + it1->closeSource(); + it2->closeSource(); } catch (...) { - LOGV2(22047, "Failure", "line"_attr = line, "iteration"_attr = iteration); + LOGV2(22047, + "Failure from line {line} on iteration {iteration}", + "line"_attr = line, + "iteration"_attr = iteration); + it1->closeSource(); + it2->closeSource(); throw; } } #define ASSERT_ITERATORS_EQUIVALENT(it1, it2) _assertIteratorsEquivalent(it1, it2, __LINE__) -std::vector<IWPair> makeDataForInMemIterator(const std::vector<int>& ints) { - std::vector<IWPair> data; - for (auto i : ints) { - data.emplace_back(i, -i); - } - return data; -} - -std::unique_ptr<IWIterator> makeInMemIterator(std::vector<IWPair>& data) { - return std::make_unique<InMemIterator<IntWrapper, IntWrapper>>(data, - IWIterator::ReturnPolicy::kMove); +template <int N> +std::shared_ptr<IWIterator> makeInMemIterator(const int (&array)[N]) { + std::vector<IWPair> vec; + for (int i = 0; i < N; i++) + vec.push_back(IWPair(array[i], -array[i])); + return std::make_shared<sorter::InMemIterator<IntWrapper, IntWrapper>>(vec); } -std::unique_ptr<IWIterator> mergeIterators( - const std::vector<std::unique_ptr<IWIterator>>& iterators, - Direction Dir = ASC, - const Options& opts = Options()) { - invariant(!opts.tempDir); - return std::make_unique<MergeIterator<IntWrapper, IntWrapper>>( - iterators, opts.limit, IWComparator(Dir)); +template <typename IteratorPtr, int N> +std::shared_ptr<IWIterator> mergeIterators(IteratorPtr (&array)[N], + Direction Dir = ASC, + const SortOptions& opts = SortOptions()) { + invariant(!opts.extSortAllowed); + std::vector<std::shared_ptr<IWIterator>> vec; + for (int i = 0; i < N; i++) + vec.push_back(std::shared_ptr<IWIterator>(array[i])); + return std::shared_ptr<IWIterator>(IWIterator::merge(vec, opts, IWComparator(Dir))); } // // Tests for Sorter framework internals // -class SingleElemIterTests { +class InMemIterTests { +public: void run() { { EmptyIterator empty; - SingleElemIterator<IntWrapper, IntWrapper> singleElem{IWIterator::ReturnPolicy::kMove}; - ASSERT_ITERATORS_EQUIVALENT(&singleElem, &empty); + sorter::InMemIterator<IntWrapper, IntWrapper> inMem; + ASSERT_ITERATORS_EQUIVALENT(&inMem, &empty); } - } -}; - -class InMemIterTests { -public: - void run() { { - auto data = makeDataForInMemIterator( - {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); - ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(data), - std::make_unique<IntIterator>(0, 20)); + static const int zeroUpTo20[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}; + ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(zeroUpTo20), + std::make_shared<IntIterator>(0, 20)); } { - // Make sure InMemIterator doesn't do any reordering on it's own. - static std::vector<int> unsorted{6, 3, 7, 4, 0, 9, 5, 7, 1, 8}; + // make sure InMemIterator doesn't do any reordering on it's own + static const int unsorted[] = {6, 3, 7, 4, 0, 9, 5, 7, 1, 8}; class UnsortedIter : public IWIterator { public: UnsortedIter() : _pos(0) {} void openSource() {} void closeSource() {} - bool more() const { - return _pos < unsorted.size(); + bool more() { + return _pos < sizeof(unsorted) / sizeof(unsorted[0]); } IWPair next() { IWPair ret(unsorted[_pos], -unsorted[_pos]); @@ -244,8 +270,7 @@ public: size_t _pos; } unsortedIter; - auto data = makeDataForInMemIterator(unsorted); - ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(data), + ASSERT_ITERATORS_EQUIVALENT(makeInMemIterator(unsorted), static_cast<IWIterator*>(&unsortedIter)); } } @@ -255,27 +280,29 @@ class SortedFileWriterAndFileIteratorTests { public: void run() { unittest::TempDir tempDir("sortedFileWriterTests"); - Options opts; - opts.tempDir = tempDir.path(); + const SortOptions opts = SortOptions().TempDir(tempDir.path()); + auto makeFile = [&] { + return std::make_shared<Sorter<IntWrapper, IntWrapper>::File>(opts.tempDir + "/" + + nextFileName()); + }; { // small - auto file = std::make_unique<File>(*opts.tempDir + "/" + nextFileName("sorter-test")); - SortedFileWriter<IntWrapper, IntWrapper> sorter(file.get()); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile()); sorter.addAlreadySorted(0, 0); sorter.addAlreadySorted(1, -1); sorter.addAlreadySorted(2, -2); sorter.addAlreadySorted(3, -3); sorter.addAlreadySorted(4, -4); - ASSERT_ITERATORS_EQUIVALENT(sorter.done(), std::make_unique<IntIterator>(0, 5)); + ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), + std::make_shared<IntIterator>(0, 5)); } { // big - auto file = std::make_unique<File>(*opts.tempDir + "/" + nextFileName("sorter-test")); - SortedFileWriter<IntWrapper, IntWrapper> sorter(file.get()); + SortedFileWriter<IntWrapper, IntWrapper> sorter(opts, makeFile()); for (int i = 0; i < 10 * 1000 * 1000; i++) sorter.addAlreadySorted(i, -i); - ASSERT_ITERATORS_EQUIVALENT(sorter.done(), - std::make_unique<IntIterator>(0, 10 * 1000 * 1000)); + ASSERT_ITERATORS_EQUIVALENT(std::shared_ptr<IWIterator>(sorter.done()), + std::make_shared<IntIterator>(0, 10 * 1000 * 1000)); } ASSERT(boost::filesystem::is_empty(tempDir.path())); @@ -287,48 +314,49 @@ class MergeIteratorTests { public: void run() { { // test empty (no inputs) - ASSERT_ITERATORS_EQUIVALENT(mergeIterators({}, ASC), std::make_unique<EmptyIterator>()); + std::vector<std::shared_ptr<IWIterator>> vec; + std::shared_ptr<IWIterator> mergeIter( + IWIterator::merge(vec, SortOptions(), IWComparator())); + ASSERT_ITERATORS_EQUIVALENT(mergeIter, std::make_shared<EmptyIterator>()); } { // test empty (only empty inputs) - std::vector<std::unique_ptr<IWIterator>> iterators; - iterators.push_back(std::make_unique<EmptyIterator>()); - iterators.push_back(std::make_unique<EmptyIterator>()); - iterators.push_back(std::make_unique<EmptyIterator>()); + std::shared_ptr<IWIterator> iterators[] = {std::make_shared<EmptyIterator>(), + std::make_shared<EmptyIterator>(), + std::make_shared<EmptyIterator>()}; ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iterators, ASC), - std::make_unique<EmptyIterator>()); + std::make_shared<EmptyIterator>()); } { // test ASC - std::vector<std::unique_ptr<IWIterator>> iterators; - iterators.push_back(std::make_unique<IntIterator>(1, 20, 2)); // 1, 3, ... 19 - iterators.push_back(std::make_unique<IntIterator>(0, 20, 2)); // 0, 2, ... 18 + std::shared_ptr<IWIterator> iterators[] = { + std::make_shared<IntIterator>(1, 20, 2) // 1, 3, ... 19 + , + std::make_shared<IntIterator>(0, 20, 2) // 0, 2, ... 18 + }; ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iterators, ASC), - std::make_unique<IntIterator>(0, 20, 1)); + std::make_shared<IntIterator>(0, 20, 1)); } { // test DESC with an empty source - std::vector<std::unique_ptr<IWIterator>> iterators; - iterators.push_back(std::make_unique<IntIterator>(30, 0, -3)); // 30, 27, ... 3 - iterators.push_back(std::make_unique<IntIterator>(29, 0, -3)); // 29, 26, ... 2 - iterators.push_back(std::make_unique<IntIterator>(28, 0, -3)); // 28, 25, ... 1 - iterators.push_back(std::make_unique<EmptyIterator>()); // 28, 25, ... 1 + std::shared_ptr<IWIterator> iterators[] = { + std::make_shared<IntIterator>(30, 0, -3), // 30, 27, ... 3 + std::make_shared<IntIterator>(29, 0, -3), // 29, 26, ... 2 + std::make_shared<IntIterator>(28, 0, -3), // 28, 25, ... 1 + std::make_shared<EmptyIterator>()}; ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iterators, DESC), - std::make_unique<IntIterator>(30, 0, -1)); + std::make_shared<IntIterator>(30, 0, -1)); } { // test Limit - std::vector<std::unique_ptr<IWIterator>> iterators; - iterators.push_back(std::make_unique<IntIterator>(1, 20, 2)); // 1, 3, ... 19 - iterators.push_back(std::make_unique<IntIterator>(0, 20, 2)); // 0, 2, ... 18 - - Options opts; - opts.limit = 10; + std::shared_ptr<IWIterator> iterators[] = { + std::make_shared<IntIterator>(1, 20, 2), // 1, 3, ... 19 + std::make_shared<IntIterator>(0, 20, 2)}; // 0, 2, ... 18 ASSERT_ITERATORS_EQUIVALENT( - mergeIterators(iterators, ASC, opts), - std::make_unique<LimitIterator>(10, std::make_unique<IntIterator>(0, 20, 1))); + mergeIterators(iterators, ASC, SortOptions().Limit(10)), + std::make_shared<LimitIterator>(10, std::make_shared<IntIterator>(0, 20, 1))); } } }; @@ -340,63 +368,54 @@ public: void run() { unittest::TempDir tempDir("sorterTests"); - Options opts; - opts.tempDir = tempDir.path(); + const SortOptions opts = SortOptions().TempDir(tempDir.path()).ExtSortAllowed(); { // test empty (no limit) - ASSERT_ITERATORS_EQUIVALENT(makeSorter(opts)->done(), - std::make_unique<EmptyIterator>()); + ASSERT_ITERATORS_EQUIVALENT(done(makeSorter(opts).get()), + std::make_shared<EmptyIterator>()); } { // test empty (limit 1) - opts.limit = 1; - ASSERT_ITERATORS_EQUIVALENT(makeSorter(opts)->done(), - std::make_unique<EmptyIterator>()); + ASSERT_ITERATORS_EQUIVALENT(done(makeSorter(SortOptions(opts).Limit(1)).get()), + std::make_shared<EmptyIterator>()); } { // test empty (limit 10) - opts.limit = 10; - ASSERT_ITERATORS_EQUIVALENT(makeSorter(opts)->done(), - std::make_unique<EmptyIterator>()); + ASSERT_ITERATORS_EQUIVALENT(done(makeSorter(SortOptions(opts).Limit(10)).get()), + std::make_shared<EmptyIterator>()); } - opts.limit = 0; const auto runTests = [this, &opts](bool assertRanges) { { // test all data ASC - std::unique_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(ASC)); + std::shared_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(ASC)); addData(sorter.get()); - ASSERT_ITERATORS_EQUIVALENT(sorter->done(), correct()); + ASSERT_ITERATORS_EQUIVALENT(done(sorter.get()), correct()); ASSERT_EQ(numAdded(), sorter->numSorted()); if (assertRanges) { assertRangeInfo(sorter, opts); } } { // test all data DESC - std::unique_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(DESC)); + std::shared_ptr<IWSorter> sorter = makeSorter(opts, IWComparator(DESC)); addData(sorter.get()); - ASSERT_ITERATORS_EQUIVALENT(sorter->done(), correctReverse()); + ASSERT_ITERATORS_EQUIVALENT(done(sorter.get()), correctReverse()); ASSERT_EQ(numAdded(), sorter->numSorted()); if (assertRanges) { assertRangeInfo(sorter, opts); } } - // The debug builds are too slow to run these tests. - // Among other things, MSVC++ makes all heap functions O(N) not O(logN). +// The debug builds are too slow to run these tests. +// Among other things, MSVC++ makes all heap functions O(N) not O(logN). #if !defined(MONGO_CONFIG_DEBUG_BUILD) { // merge all data ASC - std::unique_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(ASC)), + std::shared_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(ASC)), makeSorter(opts, IWComparator(ASC))}; addData(sorters[0].get()); addData(sorters[1].get()); - std::vector<std::unique_ptr<IWIterator>> iters1; - iters1.push_back(sorters[0]->done()); - iters1.push_back(sorters[1]->done()); - - std::vector<std::unique_ptr<IWIterator>> iters2; - iters2.push_back(correct()); - iters2.push_back(correct()); - + std::shared_ptr<IWIterator> iters1[] = {done(sorters[0].get()), + done(sorters[1].get())}; + std::shared_ptr<IWIterator> iters2[] = {correct(), correct()}; ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iters1, ASC), mergeIterators(iters2, ASC)); @@ -406,22 +425,16 @@ public: } } { // merge all data DESC and use multiple threads to insert - std::unique_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(DESC)), + std::shared_ptr<IWSorter> sorters[] = {makeSorter(opts, IWComparator(DESC)), makeSorter(opts, IWComparator(DESC))}; stdx::thread inBackground(&Basic::addData, this, sorters[0].get()); addData(sorters[1].get()); inBackground.join(); - std::vector<std::unique_ptr<IWIterator>> iters1; - - iters1.push_back(sorters[0]->done()); - iters1.push_back(sorters[1]->done()); - - std::vector<std::unique_ptr<IWIterator>> iters2; - iters2.push_back(correctReverse()); - iters2.push_back(correctReverse()); - + std::shared_ptr<IWIterator> iters1[] = {done(sorters[0].get()), + done(sorters[1].get())}; + std::shared_ptr<IWIterator> iters2[] = {correctReverse(), correctReverse()}; ASSERT_ITERATORS_EQUIVALENT(mergeIterators(iters1, DESC), mergeIterators(iters2, DESC)); @@ -463,13 +476,13 @@ public: } // returns an iterator with the correct results - virtual std::unique_ptr<IWIterator> correct() { - return std::make_unique<IntIterator>(0, 5); // 0, 1, ... 4 + virtual std::shared_ptr<IWIterator> correct() { + return std::make_shared<IntIterator>(0, 5); // 0, 1, ... 4 } // like correct but with opposite sort direction - virtual std::unique_ptr<IWIterator> correctReverse() { - return std::make_unique<IntIterator>(4, -1, -1); // 4, 3, ... 0 + virtual std::shared_ptr<IWIterator> correctReverse() { + return std::make_shared<IntIterator>(4, -1, -1); // 4, 3, ... 0 } virtual size_t correctNumRanges() const { @@ -477,23 +490,27 @@ public: } // It is safe to ignore / overwrite any part of options - virtual Options adjustSortOptions(Options opts) { + virtual SortOptions adjustSortOptions(SortOptions opts) { return opts; } private: // Make a new sorter with desired opts and comp. Opts may be ignored but not comp - std::unique_ptr<IWSorter> makeSorter(Options opts, IWComparator comp = IWComparator(ASC)) { - return sorter::make<IntWrapper, IntWrapper>("sorter-test", adjustSortOptions(opts), comp); + std::shared_ptr<IWSorter> makeSorter(SortOptions opts, IWComparator comp = IWComparator(ASC)) { + return std::shared_ptr<IWSorter>(IWSorter::make(adjustSortOptions(opts), comp)); + } + + std::shared_ptr<IWIterator> done(IWSorter* sorter) { + return std::shared_ptr<IWIterator>(sorter->done()); } - void assertRangeInfo(const std::unique_ptr<IWSorter>& sorter, const Options& opts) { + void assertRangeInfo(const std::shared_ptr<IWSorter>& sorter, const SortOptions& opts) { auto numRanges = correctNumRanges(); if (numRanges == 0) return; auto state = sorter->persistDataForShutdown(); - if (opts.tempDir) { + if (opts.extSortAllowed) { ASSERT_NE(state.fileName, ""); } ASSERT_EQ(state.ranges.size(), numRanges); @@ -501,9 +518,8 @@ private: }; class Limit : public Basic { - Options adjustSortOptions(Options opts) override { - opts.limit = 5; - return opts; + SortOptions adjustSortOptions(SortOptions opts) override { + return opts.Limit(5); } void addData(IWSorter* sorter) override { sorter->add(0, 0); @@ -516,19 +532,18 @@ class Limit : public Basic { size_t numAdded() const override { return 6; } - std::unique_ptr<IWIterator> correct() override { - return std::make_unique<IntIterator>(-1, 4); + std::shared_ptr<IWIterator> correct() override { + return std::make_shared<IntIterator>(-1, 4); } - std::unique_ptr<IWIterator> correctReverse() override { - return std::make_unique<IntIterator>(4, -1, -1); + std::shared_ptr<IWIterator> correctReverse() override { + return std::make_shared<IntIterator>(4, -1, -1); } }; template <uint64_t Limit> class LimitExtreme : public Basic { - Options adjustSortOptions(Options opts) override { - opts.limit = Limit; - return opts; + SortOptions adjustSortOptions(SortOptions opts) override { + return opts.Limit(Limit); } }; @@ -548,13 +563,13 @@ class Dupes : public Basic { size_t numAdded() const override { return 10; } - std::unique_ptr<IWIterator> correct() override { - static auto data = makeDataForInMemIterator({-1, -1, -1, 0, 1, 1, 1, 2, 2, 3}); - return makeInMemIterator(data); + std::shared_ptr<IWIterator> correct() override { + const int array[] = {-1, -1, -1, 0, 1, 1, 1, 2, 2, 3}; + return makeInMemIterator(array); } - std::unique_ptr<IWIterator> correctReverse() override { - static auto data = makeDataForInMemIterator({3, 2, 2, 1, 1, 1, 0, -1, -1, -1}); - return makeInMemIterator(data); + std::shared_ptr<IWIterator> correctReverse() override { + const int array[] = {3, 2, 2, 1, 1, 1, 0, -1, -1, -1}; + return makeInMemIterator(array); } }; @@ -569,14 +584,12 @@ public: std::shuffle(_array.get(), _array.get() + NUM_ITEMS, _random.urbg()); } - Options adjustSortOptions(Options opts) override { + SortOptions adjustSortOptions(SortOptions opts) override { // Make sure we use a reasonable number of files when we spill MONGO_STATIC_ASSERT((NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT > 50); MONGO_STATIC_ASSERT((NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT < 500); - opts.maxMemoryUsageBytes = MEM_LIMIT; - - return opts; + return opts.MaxMemoryUsageBytes(MEM_LIMIT).ExtSortAllowed(); } void addData(IWSorter* sorter) override { @@ -588,11 +601,11 @@ public: return NUM_ITEMS; } - std::unique_ptr<IWIterator> correct() override { - return std::make_unique<IntIterator>(0, NUM_ITEMS); + std::shared_ptr<IWIterator> correct() override { + return std::make_shared<IntIterator>(0, NUM_ITEMS); } - std::unique_ptr<IWIterator> correctReverse() override { - return std::make_unique<IntIterator>(NUM_ITEMS - 1, -1, -1); + std::shared_ptr<IWIterator> correctReverse() override { + return std::make_shared<IntIterator>(NUM_ITEMS - 1, -1, -1); } size_t correctNumRanges() const override { @@ -613,7 +626,7 @@ public: template <long long Limit, bool Random = true> class LotsOfDataWithLimit : public LotsOfDataLittleMemory<Random> { typedef LotsOfDataLittleMemory<Random> Parent; - Options adjustSortOptions(Options opts) { + SortOptions adjustSortOptions(SortOptions opts) { // Make sure our tests will spill or not as desired MONGO_STATIC_ASSERT(MEM_LIMIT / 2 > (100 * sizeof(IWPair))); MONGO_STATIC_ASSERT(MEM_LIMIT < (5000 * sizeof(IWPair))); @@ -623,16 +636,13 @@ class LotsOfDataWithLimit : public LotsOfDataLittleMemory<Random> { MONGO_STATIC_ASSERT((Parent::NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT > 100); MONGO_STATIC_ASSERT((Parent::NUM_ITEMS * sizeof(IWPair)) / MEM_LIMIT < 500); - opts.maxMemoryUsageBytes = MEM_LIMIT; - opts.limit = Limit; - - return opts; + return opts.MaxMemoryUsageBytes(MEM_LIMIT).ExtSortAllowed().Limit(Limit); } - std::unique_ptr<IWIterator> correct() override { - return std::make_unique<LimitIterator>(Limit, Parent::correct()); + std::shared_ptr<IWIterator> correct() override { + return std::make_shared<LimitIterator>(Limit, Parent::correct()); } - std::unique_ptr<IWIterator> correctReverse() override { - return std::make_unique<LimitIterator>(Limit, Parent::correctReverse()); + std::shared_ptr<IWIterator> correctReverse() override { + return std::make_shared<LimitIterator>(Limit, Parent::correctReverse()); } size_t correctNumRanges() const override { // For the TopKSorter, the number of ranges depends on the specific composition of the data @@ -707,53 +717,55 @@ DEATH_TEST_F( SorterMakeFromExistingRangesTest, NonZeroLimit, "Creating a Sorter from existing ranges is only available with the NoLimitSorter (limit 0)") { - Options opts; - opts.limit = 1; - opts.tempDir = "unused_temp_dir"; - sorter::makeFromExistingRanges<IntWrapper, IntWrapper>("", {}, opts, IWComparator(ASC)); + auto opts = SortOptions().Limit(1ULL); + IWSorter::makeFromExistingRanges("", {}, opts, IWComparator(ASC)); } -DEATH_TEST_F(SorterMakeFromExistingRangesTest, ExtSortNotAllowed, "options.tempDir") { - Options opts; - ASSERT_FALSE(opts.tempDir); - sorter::makeFromExistingRanges<IntWrapper, IntWrapper>("", {}, opts, IWComparator(ASC)); +DEATH_TEST_F(SorterMakeFromExistingRangesTest, ExtSortNotAllowed, "opts.extSortAllowed") { + auto opts = SortOptions(); + ASSERT_FALSE(opts.extSortAllowed); + IWSorter::makeFromExistingRanges("", {}, opts, IWComparator(ASC)); +} + +DEATH_TEST_F(SorterMakeFromExistingRangesTest, EmptyTempDir, "!opts.tempDir.empty()") { + auto opts = SortOptions().ExtSortAllowed(); + ASSERT_EQUALS("", opts.tempDir); + IWSorter::makeFromExistingRanges("", {}, opts, IWComparator(ASC)); } DEATH_TEST_F(SorterMakeFromExistingRangesTest, EmptyFileName, "!fileName.empty()") { std::string fileName; - Options opts; - opts.tempDir = "unused_temp_dir"; - sorter::makeFromExistingRanges<IntWrapper, IntWrapper>(fileName, {}, opts, IWComparator(ASC)); + auto opts = SortOptions().ExtSortAllowed().TempDir("unused_temp_dir"); + IWSorter::makeFromExistingRanges(fileName, {}, opts, IWComparator(ASC)); } TEST_F(SorterMakeFromExistingRangesTest, SkipFileCheckingOnEmptyRanges) { auto fileName = "unused_sorter_file"; - Options opts; - opts.tempDir = "unused_temp_dir"; - auto sorter = sorter::makeFromExistingRanges<IntWrapper, IntWrapper>( - fileName, {}, opts, IWComparator(ASC)); + auto opts = SortOptions().ExtSortAllowed().TempDir("unused_temp_dir"); + auto sorter = std::unique_ptr<IWSorter>( + IWSorter::makeFromExistingRanges(fileName, {}, opts, IWComparator(ASC))); ASSERT_EQ(0, sorter->numSpills()); auto iter = std::unique_ptr<IWIterator>(sorter->done()); ASSERT_EQ(0, sorter->numSorted()); + iter->openSource(); ASSERT_FALSE(iter->more()); + iter->closeSource(); } TEST_F(SorterMakeFromExistingRangesTest, MissingFile) { auto fileName = "unused_sorter_file"; auto tempDir = "unused_temp_dir"; - Options opts; - opts.tempDir = tempDir; - auto makeSorter = [&] { - sorter::makeFromExistingRanges<IntWrapper, IntWrapper>( - fileName, makeSampleRanges(), opts, IWComparator(ASC)); - }; - ASSERT_THROWS_WITH_CHECK(makeSorter(), std::exception, [&](const auto& ex) { - ASSERT_STRING_CONTAINS(ex.what(), tempDir); - ASSERT_STRING_CONTAINS(ex.what(), fileName); - }); + auto opts = SortOptions().ExtSortAllowed().TempDir(tempDir); + ASSERT_THROWS_WITH_CHECK( + IWSorter::makeFromExistingRanges(fileName, makeSampleRanges(), opts, IWComparator(ASC)), + std::exception, + [&](const auto& ex) { + ASSERT_STRING_CONTAINS(ex.what(), tempDir); + ASSERT_STRING_CONTAINS(ex.what(), fileName); + }); } TEST_F(SorterMakeFromExistingRangesTest, EmptyFile) { @@ -762,14 +774,12 @@ TEST_F(SorterMakeFromExistingRangesTest, EmptyFile) { ASSERT(std::ofstream(tempFilePath.string())) << "failed to create empty temporary file: " << tempFilePath.string(); auto fileName = tempFilePath.filename().string(); - Options opts; - opts.tempDir = tempDir.path(); - auto makeSorter = [&] { - sorter::makeFromExistingRanges<IntWrapper, IntWrapper>( - fileName, makeSampleRanges(), opts, IWComparator(ASC)); - }; + auto opts = SortOptions().ExtSortAllowed().TempDir(tempDir.path()); // 16815 - unexpected empty file. - ASSERT_THROWS_CODE(makeSorter(), DBException, 16815); + ASSERT_THROWS_CODE( + IWSorter::makeFromExistingRanges(fileName, makeSampleRanges(), opts, IWComparator(ASC)), + DBException, + 16815); } TEST_F(SorterMakeFromExistingRangesTest, CorruptedFile) { @@ -781,10 +791,9 @@ TEST_F(SorterMakeFromExistingRangesTest, CorruptedFile) { ofs << "invalid sorter data"; } auto fileName = tempFilePath.filename().string(); - Options opts; - opts.tempDir = tempDir.path(); - auto sorter = sorter::makeFromExistingRanges<IntWrapper, IntWrapper>( - fileName, makeSampleRanges(), opts, IWComparator(ASC)); + auto opts = SortOptions().ExtSortAllowed().TempDir(tempDir.path()); + auto sorter = std::unique_ptr<IWSorter>( + IWSorter::makeFromExistingRanges(fileName, makeSampleRanges(), opts, IWComparator(ASC))); // The number of spills is set when NoLimitSorter is constructed from existing ranges. ASSERT_EQ(makeSampleRanges().size(), sorter->numSpills()); @@ -797,9 +806,10 @@ TEST_F(SorterMakeFromExistingRangesTest, CorruptedFile) { TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { unittest::TempDir tempDir(_agent.getSuiteName() + "_" + _agent.getTestName()); - Options opts; - opts.tempDir = tempDir.path(); - opts.maxMemoryUsageBytes = sizeof(IWSorter::Data); + auto opts = SortOptions() + .ExtSortAllowed() + .TempDir(tempDir.path()) + .MaxMemoryUsageBytes(sizeof(IWSorter::Data)); IWPair pairInsertedBeforeShutdown(1, 100); @@ -809,7 +819,7 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { IWSorter::PersistedState state; { auto sorterBeforeShutdown = - sorter::make<IntWrapper, IntWrapper>("sorter-test", opts, IWComparator(ASC)); + std::unique_ptr<IWSorter>(IWSorter::make(opts, IWComparator(ASC))); sorterBeforeShutdown->add(pairInsertedBeforeShutdown.first, pairInsertedBeforeShutdown.second); state = sorterBeforeShutdown->persistDataForShutdown(); @@ -819,8 +829,8 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { } // On restart, reconstruct sorter from persisted state. - auto sorter = sorter::makeFromExistingRanges<IntWrapper, IntWrapper>( - state.fileName, state.ranges, opts, IWComparator(ASC)); + auto sorter = std::unique_ptr<IWSorter>( + IWSorter::makeFromExistingRanges(state.fileName, state.ranges, opts, IWComparator(ASC))); // The number of spills is set when NoLimitSorter is constructed from existing ranges. ASSERT_EQ(state.ranges.size(), sorter->numSpills()); @@ -829,12 +839,13 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { IWPair pairInsertedAfterStartup(2, 200); sorter->add(pairInsertedAfterStartup.first, pairInsertedAfterStartup.second); - // Only the pair added after reconstructing the Sorter is counted. - ASSERT_EQ(1, sorter->numSorted()); + // Technically this sorter has not sorted anything. It is just merging files. + ASSERT_EQ(0, sorter->numSorted()); // Read data from sorter. { auto iter = std::unique_ptr<IWIterator>(sorter->done()); + iter->openSource(); ASSERT(iter->more()); auto pair1 = iter->next(); @@ -851,8 +862,10 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { << pair2.first << "/" << pair2.second; ASSERT_FALSE(iter->more()); + iter->closeSource(); } } } // namespace -} // namespace mongo::sorter +} // namespace sorter +} // namespace mongo diff --git a/src/mongo/db/sorter/spillable_sorter.h b/src/mongo/db/sorter/spillable_sorter.h deleted file mode 100644 index 724194ebe2c..00000000000 --- a/src/mongo/db/sorter/spillable_sorter.h +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/in_mem_iterator.h" -#include "mongo/db/sorter/merge_iterator.h" -#include "mongo/db/sorter/sorted_file_writer.h" -#include "mongo/db/sorter/sorter.h" - -namespace mongo::sorter { -/** - * A Sorter which may spill to disk if configured to do so. Each instance of this class will, if - * spilling is enabled, generate a file name and spill sorted data ranges to that file. - */ -template <typename Key, typename Value> -class SpillableSorter : public Sorter<Key, Value> { -public: - using Base = Sorter<Key, Value>; - using Data = typename Base::Data; - using Iterator = typename Base::Iterator; - using CompFn = typename Base::CompFn; - using Settings = typename Base::Settings; - - using Base::_comp; - using Base::_done; - - SpillableSorter(StringData name, - const Options& options, - const CompFn& comp, - const Settings& settings) - : SpillableSorter(options, - comp, - settings, - options.tempDir - ? std::make_unique<File>(*options.tempDir + "/" + nextFileName(name)) - : nullptr) {} - - SpillableSorter(const Options& options, - const CompFn& comp, - const Settings& settings, - const std::string& fileName) - : SpillableSorter( - options, comp, settings, std::make_unique<File>(*options.tempDir + "/" + fileName)) { - invariant(!fileName.empty()); - } - - std::unique_ptr<Iterator> done(typename Iterator::ReturnPolicy returnPolicy) { - invariant(!std::exchange(_done, true)); - - if (_spilled.empty()) { - _sort(); - return std::make_unique<InMemIterator<Key, Value>>(_data, returnPolicy); - } - - _spill(); - return std::make_unique<MergeIterator<Key, Value>>(_spilled, _options.limit, _comp); - } - - size_t numSpills() const override { - return _spilled.size(); - } - -protected: - virtual void _sort() = 0; - - void _spill() { - if (_data.empty()) { - return; - } - - if (!_options.tempDir) { - // This error message only applies to sorts from user queries made through the find or - // aggregation commands. Other clients should suppress this error, either by allowing - // external sorting or by catching and throwing a more appropriate error. - uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - str::stream() - << "Sort exceeded memory limit of " << _options.maxMemoryUsageBytes - << " bytes, but did not opt in to external sorting."); - } - - _sort(); - _updateStateAfterSort(); - - SortedFileWriter<Key, Value> writer(_file.get(), _options.dbName, _settings); - for (size_t i = 0; i < _data.size(); i++) { - writer.addAlreadySorted(_data[i].first, _data[i].second); - } - _spilled.push_back(writer.done()); - - // Clear _data and release backing array's memory. - std::vector<Data>().swap(_data); - _memUsed = 0; - } - - virtual void _updateStateAfterSort() {} - - const Options _options; - const std::function<bool(const Data&, const Data&)> _less; - const Settings _settings; - - std::vector<Data> _data; - - std::unique_ptr<File> _file; - std::vector<std::unique_ptr<Iterator>> _spilled; - - size_t _memUsed = 0; - -private: - SpillableSorter(const Options& options, - const CompFn& comp, - const Settings& settings, - std::unique_ptr<File> file) - : Base(comp), - _options(options), - _less([comp](const Data& lhs, const Data& rhs) { - dassertCompIsSane(comp, lhs, rhs); - return comp(lhs, rhs) < 0; - }), - _settings(settings), - _file(std::move(file)) {} -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/top_k_sorter.h b/src/mongo/db/sorter/top_k_sorter.h deleted file mode 100644 index 9a0569171fb..00000000000 --- a/src/mongo/db/sorter/top_k_sorter.h +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/sorter/sorter.h" - -#include "mongo/db/sorter/spillable_sorter.h" - -namespace mongo::sorter { -template <typename Key, typename Value> -class TopKSorter : public SpillableSorter<Key, Value> { -public: - using Base = SpillableSorter<Key, Value>; - using Data = typename Base::Data; - using Iterator = typename Base::Iterator; - using CompFn = typename Base::CompFn; - using Settings = typename Base::Settings; - - using Base::_data; - using Base::_done; - using Base::_less; - using Base::_memUsed; - using Base::_numSorted; - using Base::_options; - using Base::_spill; - using Base::_totalDataSizeSorted; - - TopKSorter(StringData name, - const Options& options, - const CompFn& comp, - const Settings& settings) - : Base(name, options, comp, settings) { - // This also works with limit 1, but LimitOneSorter should be used instead for that case. - invariant(options.limit > 1); - - // Preallocate a fixed sized vector of the required size if we don't expect it to have a - // major impact on our memory budget. This is the common case with small limits. - if (options.limit < std::min((options.maxMemoryUsageBytes / 10) / - sizeof(typename decltype(_data)::value_type), - _data.max_size())) { - _data.reserve(options.limit); - } - } - - void add(const Key& key, const Value& val) { - invariant(!_done); - - ++_numSorted; - - Data contender{key, val}; - - if (_data.size() < _options.limit) { - if (_haveCutoff && !_less(contender, _cutoff)) { - return; - } - - _data.emplace_back(contender.first.getOwned(), contender.second.getOwned()); - - auto memUsage = key.memUsageForSorter() + val.memUsageForSorter(); - _memUsed += memUsage; - _totalDataSizeSorted += memUsage; - - if (_data.size() == _options.limit) { - std::make_heap(_data.begin(), _data.end(), _less); - } - - if (_memUsed > _options.maxMemoryUsageBytes) { - _spill(); - } - - return; - } - - invariant(_data.size() == _options.limit); - - if (!_less(contender, _data.front())) { - return; - } - - // Remove the old worst pair and insert the contender, adjusting _memUsed. - - auto memUsage = key.memUsageForSorter() + val.memUsageForSorter(); - _memUsed += memUsage; - _totalDataSizeSorted += memUsage; - - _memUsed -= _data.front().first.memUsageForSorter(); - _memUsed -= _data.front().second.memUsageForSorter(); - - std::pop_heap(_data.begin(), _data.end(), _less); - _data.back() = {contender.first.getOwned(), contender.second.getOwned()}; - std::push_heap(_data.begin(), _data.end(), _less); - - if (_memUsed > _options.maxMemoryUsageBytes) { - _spill(); - } - } - -private: - void _sort() { - if (_data.size() == _options.limit) { - std::sort_heap(_data.begin(), _data.end(), _less); - } else { - std::stable_sort(_data.begin(), _data.end(), _less); - } - } - - // Can only be called after _data is sorted - void _updateStateAfterSort() override { - // Theory of operation: We want to be able to eagerly ignore values we know will not - // be in the TopK result set by setting _cutoff to a value we know we have at least - // K values equal to or better than. There are two values that we track to - // potentially become the next value of _cutoff: _worstSeen and _lastMedian. When - // one of these values becomes the new _cutoff, its associated counter is reset to 0 - // and a new value is chosen for that member the next time we spill. - // - // _worstSeen is the worst value we've seen so that all kept values are better than - // (or equal to) it. This means that once _worstCount >= _opts.limit there is no - // reason to consider values worse than _worstSeen so it can become the new _cutoff. - // This technique is especially useful when the input is already roughly sorted (eg - // sorting ASC on an ObjectId or Date field) since we will quickly find a cutoff - // that will exclude most later values, making the full TopK operation including - // the MergeIterator phase is O(K) in space and O(N + K*Log(K)) in time. - // - // _lastMedian was the median of the _data in the first spill() either overall or - // following a promotion of _lastMedian to _cutoff. We count the number of kept - // values that are better than or equal to _lastMedian in _medianCount and can - // promote _lastMedian to _cutoff once _medianCount >=_opts.limit. Assuming - // reasonable median selection (which should happen when the data is completely - // unsorted), after the first K spilled values, we will keep roughly 50% of the - // incoming values, 25% after the second K, 12.5% after the third K, etc. This means - // that by the time we spill 3*K values, we will have seen (1*K + 2*K + 4*K) values, - // so the expected number of kept values is O(Log(N/K) * K). The final run time if - // using the O(K*Log(N)) merge algorithm in MergeIterator is O(N + K*Log(K) + - // K*LogLog(N/K)) which is much closer to O(N) than O(N*Log(K)). - // - // This leaves a currently unoptimized worst case of data that is already roughly - // sorted, but in the wrong direction, such that the desired results are all the - // last ones seen. It will require O(N) space and O(N*Log(K)) time. Since this - // should be trivially detectable, as a future optimization it might be nice to - // detect this case and reverse the direction of input (if possible) which would - // turn this into the best case described above. - // - // Pedantic notes: The time complexities above (which count number of comparisons) - // ignore the sorting of batches prior to spilling to disk since they make it more - // confusing without changing the results. If you want to add them back in, add an - // extra term to each time complexity of (SPACE_COMPLEXITY * Log(BATCH_SIZE)). Also, - // all space complexities measure disk space rather than memory since this class is - // O(1) in memory due to the _opts.maxMemoryUsageBytes limit. - - // Pick a new _worstSeen or _lastMedian if should. - if (_worstCount == 0 || _less(_worstSeen, _data.back())) { - _worstSeen = _data.back(); - } - if (_medianCount == 0) { - size_t medianIndex = _data.size() / 2; // Chooses the higher if size is even. - _lastMedian = _data[medianIndex]; - } - - // Add the counters of kept objects better than or equal to _worstSeen/_lastMedian. - _worstCount += _data.size(); // Everything is better or equal. - typename std::vector<Data>::iterator firstWorseThanLastMedian = - std::upper_bound(_data.begin(), _data.end(), _lastMedian, _less); - _medianCount += std::distance(_data.begin(), firstWorseThanLastMedian); - - - // Promote _worstSeen or _lastMedian to _cutoff and reset counters if we should. - if (_worstCount >= _options.limit) { - if (!_haveCutoff || _less(_worstSeen, _cutoff)) { - _cutoff = _worstSeen; - _haveCutoff = true; - } - _worstCount = 0; - } - if (_medianCount >= _options.limit) { - if (!_haveCutoff || _less(_lastMedian, _cutoff)) { - _cutoff = _lastMedian; - _haveCutoff = true; - } - _medianCount = 0; - } - } - - // See updateCutoff() for a full description of how these members are used. - bool _haveCutoff = false; - Data _cutoff; // We can definitely ignore values worse than this. - Data _worstSeen; // The worst Data seen so far. Reset when _worstCount >= _opts.limit. - size_t _worstCount = 0; // Number of docs better or equal to _worstSeen kept so far. - Data _lastMedian; // Median of a batch. Reset when _medianCount >= _opts.limit. - size_t _medianCount = 0; // Number of docs better or equal to _lastMedian kept so far. -}; -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/util.cpp b/src/mongo/db/sorter/util.cpp deleted file mode 100644 index f4cfc13e0f1..00000000000 --- a/src/mongo/db/sorter/util.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/sorter/util.h" - -#include "mongo/platform/random.h" - -namespace mongo::sorter { -std::string nextFileName(StringData name) { - static AtomicWord<unsigned> fileCounter; - static const auto randomSuffix = SecureRandom{}.nextInt64(); - return str::stream() << "extsort-" << name << "." << fileCounter.fetchAndAdd(1) << '-' - << randomSuffix; -} - -uint32_t addDataToChecksum(const void* data, std::size_t size, uint32_t checksum) { - uint32_t newChecksum; - MurmurHash3_x86_32(data, size, checksum, &newChecksum); - return newChecksum; -} - -EncryptionHooks* getEncryptionHooksIfEnabled() { - // Some tests may not run with a global service context. - if (!hasGlobalServiceContext()) { - return nullptr; - } - auto service = getGlobalServiceContext(); - auto encryptionHooks = EncryptionHooks::get(service); - if (!encryptionHooks->enabled()) { - return nullptr; - } - return encryptionHooks; -} -} // namespace mongo::sorter diff --git a/src/mongo/db/sorter/util.h b/src/mongo/db/sorter/util.h deleted file mode 100644 index 3ff9d17a1b9..00000000000 --- a/src/mongo/db/sorter/util.h +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <cstdint> -#include <third_party/murmurhash3/MurmurHash3.h> - -#include "mongo/config.h" -#include "mongo/db/service_context.h" -#include "mongo/db/sorter/options.h" -#include "mongo/db/storage/encryption_hooks.h" - -namespace mongo::sorter { -/** - * Generates a new file name on each call using a static, atomic and monotonically increasing - * number. Each name is suffixed with a random number generated at startup, to prevent name - * collisions when the external sort files are preserved across restarts. - */ -std::string nextFileName(StringData name); - -/** - * Calculates and returns a new murmur hash value based on the prior murmur hash and a new piece - * of data. - */ -uint32_t addDataToChecksum(const void* data, std::size_t size, uint32_t checksum); - -/** - * Returns the current EncryptionHooks registered with the global service context. - * Returns nullptr if the service context is not available; or if the EncyptionHooks - * registered is not enabled. - */ -EncryptionHooks* getEncryptionHooksIfEnabled(); - -template <typename Data> -void dassertCompIsSane(const std::function<int(const Data& lhs, const Data& rhs)>& comp, - const Data& lhs, - const Data& rhs) { -#if defined(MONGO_CONFIG_DEBUG_BUILD) && !defined(_MSC_VER) - // MSVC++ already does similar verification in debug mode in addition to using algorithms that - // do more comparisons. Doing our own verification in addition makes debug builds considerably - // slower without any additional safety. - - // Test reversed comparisons. - const int regular = comp(lhs, rhs); - if (regular == 0) { - invariant(comp(rhs, lhs) == 0); - } else if (regular < 0) { - invariant(comp(rhs, lhs) > 0); - } else { - invariant(comp(rhs, lhs) < 0); - } - - // Test reflexivity. - invariant(comp(lhs, lhs) == 0); - invariant(comp(rhs, rhs) == 0); -#endif -} -} // namespace mongo::sorter |