diff options
author | Misha Ivkov <misha.ivkov@10gen.com> | 2019-07-02 18:18:37 -0400 |
---|---|---|
committer | Misha Ivkov <misha.ivkov@10gen.com> | 2019-07-23 16:28:11 -0400 |
commit | a8e3a6317dc2579b90523fc8b47fd535db776939 (patch) | |
tree | 6b8fec33d63c2feb6b1e691cbc26c6d965d228cd | |
parent | 5020e611bbffd51bb8cebd3790f8537b7eb4c03c (diff) | |
download | mongo-a8e3a6317dc2579b90523fc8b47fd535db776939.tar.gz |
SERVER-41960 Refactor DocumentSourceSort logic into SortExecutor
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/db/exec/sort_executor.cpp | 149 | ||||
-rw-r--r-- | src/mongo/db/exec/sort_executor.h | 118 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 190 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 66 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/query/sort_pattern.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/query/sort_pattern.h | 2 |
12 files changed, 368 insertions, 200 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index e52b3d792ea..b4721e6a2d3 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1036,6 +1036,7 @@ env.Library( 'db_raii', 'dbdirectclient', 'exec/scoped_timer', + 'exec/sort_executor', 'exec/working_set', 'fts/base_fts', 'index/index_descriptor', diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index a790d0e21a1..7ee2806711e 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -28,6 +28,22 @@ env.Library( ], ) +sortExecutorEnv = env.Clone() +sortExecutorEnv.InjectThirdParty(libraries=['snappy']) +sortExecutorEnv.Library( + target="sort_executor", + source=[ + "sort_executor.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/query/sort_pattern', + '$BUILD_DIR/mongo/db/storage/encryption_hooks', + '$BUILD_DIR/mongo/db/storage/storage_options', + '$BUILD_DIR/mongo/s/is_mongos', + '$BUILD_DIR/third_party/shim_snappy', + ], +) + env.Library( target='stagedebug_cmd', source=[ diff --git a/src/mongo/db/exec/sort_executor.cpp b/src/mongo/db/exec/sort_executor.cpp new file mode 100644 index 00000000000..8fd20cfda46 --- /dev/null +++ b/src/mongo/db/exec/sort_executor.cpp @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/sort_executor.h" +#include "mongo/db/pipeline/value_comparator.h" + +namespace mongo { +namespace { +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicWord<unsigned> sortExecutorFileCounter; + return "extsort-sort-executor." + std::to_string(sortExecutorFileCounter.fetchAndAdd(1)); +} +} // namespace + +SortExecutor::SortExecutor(SortPattern sortPattern, + uint64_t limit, + uint64_t maxMemoryUsageBytes, + std::string tempDir, + bool allowDiskUse) + : _sortPattern(std::move(sortPattern)), + _limit(limit), + _maxMemoryUsageBytes(maxMemoryUsageBytes), + _tempDir(std::move(tempDir)), + _diskUseAllowed(allowDiskUse) {} + +int SortExecutor::Comparator::operator()(const DocumentSorter::Data& lhs, + const DocumentSorter::Data& rhs) const { + Value lhsKey = lhs.first; + Value rhsKey = rhs.first; + // DocumentSourceSort::populate() has already guaranteed that the sort key is non-empty. + // However, the tricky part is deciding what to do if none of the sort keys are present. In that + // case, consider the document "less". + // + // Note that 'comparator' must use binary comparisons here, as both 'lhs' and 'rhs' are + // collation comparison keys. + ValueComparator comparator; + const size_t n = _sort.size(); + if (n == 1) { // simple fast case + if (_sort[0].isAscending) + return comparator.compare(lhsKey, rhsKey); + else + return -comparator.compare(lhsKey, rhsKey); + } + + // compound sort + for (size_t i = 0; i < n; i++) { + int cmp = comparator.compare(lhsKey[i], rhsKey[i]); + if (cmp) { + /* if necessary, adjust the return value by the key ordering */ + if (!_sort[i].isAscending) + cmp = -cmp; + + return cmp; + } + } + + /* + If we got here, everything matched (or didn't exist), so we'll + consider the documents equal for purposes of this sort. + */ + return 0; +} + +boost::optional<Document> SortExecutor::getNext() { + if (_isEOF) { + return boost::none; + } + + if (!_output->more()) { + _output.reset(); + _isEOF = true; + return boost::none; + } + + return _output->next().second; +} + +void SortExecutor::add(Value sortKey, Document data) { + if (!_sorter) { + _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); + } + _sorter->add(std::move(sortKey), std::move(data)); +} + +void SortExecutor::loadingDone() { + // This conditional should only pass if no documents were added to the sorter. + if (!_sorter) { + _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); + } + _output.reset(_sorter->done()); + _wasDiskUsed = _wasDiskUsed || _sorter->usedDisk(); + _sorter.reset(); +} + +SortOptions SortExecutor::makeSortOptions() const { + SortOptions opts; + if (_limit) { + opts.limit = _limit; + } + + opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; + if (_diskUseAllowed) { + opts.extSortAllowed = true; + opts.tempDir = _tempDir; + } + + return opts; +} +} // namespace mongo + +#include "mongo/db/sorter/sorter.cpp" +// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h new file mode 100644 index 00000000000..95741f41256 --- /dev/null +++ b/src/mongo/db/exec/sort_executor.h @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/query/sort_pattern.h" +#include "mongo/db/sorter/sorter.h" + +namespace mongo { +/** + * The SortExecutor class is the internal implementation of sorting for query execution. The + * caller should provide input documents by repeated calls to the add() function, and then + * complete the loading process with a single call to loadingDone(). Finally, getNext() should be + * called to return the documents one by one in sorted order. + */ +class SortExecutor { +public: + /** + * If the passed in limit is 0, this is treated as no limit. + */ + SortExecutor(SortPattern sortPattern, + uint64_t limit, + uint64_t maxMemoryUsageBytes, + std::string tempDir, + bool allowDiskUse); + + boost::optional<Document> getNext(); + + const SortPattern& sortPattern() const { + return _sortPattern; + } + + /** + * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep + * the smallest limit. + */ + void setLimit(uint64_t limit) { + if (!_limit || limit < _limit) + _limit = limit; + } + + uint64_t getLimit() const { + return _limit; + } + + bool hasLimit() const { + return _limit > 0; + } + + bool wasDiskUsed() const { + return _wasDiskUsed; + } + + /** + * Signals to the sort executor that there will be no more input documents. + */ + void loadingDone(); + + /** + * Add a Document with sort key specified by Value to the DocumentSorter. + */ + void add(Value, Document); + +private: + using DocumentSorter = Sorter<Value, Document>; + class Comparator { + public: + Comparator(const SortPattern& sortPattern) : _sort(sortPattern) {} + int operator()(const DocumentSorter::Data& lhs, const DocumentSorter::Data& rhs) const; + + private: + const SortPattern& _sort; + }; + + SortOptions makeSortOptions() const; + + SortPattern _sortPattern; + // A limit of zero is defined as no limit. + uint64_t _limit; + uint64_t _maxMemoryUsageBytes; + std::string _tempDir; + bool _diskUseAllowed = false; + + std::unique_ptr<DocumentSorter> _sorter; + std::unique_ptr<DocumentSorter::Iterator> _output; + + bool _isEOF = false; + bool _wasDiskUsed = false; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 984bdbdf89c..2d71d9dfadc 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -269,9 +269,9 @@ env.Library( ], ) -pipelineeEnv = env.Clone() -pipelineeEnv.InjectThirdParty(libraries=['snappy']) -pipelineeEnv.Library( +pipelineEnv = env.Clone() +pipelineEnv.InjectThirdParty(libraries=['snappy']) +pipelineEnv.Library( target='pipeline', source=[ 'document_source.cpp', @@ -330,6 +330,7 @@ pipelineeEnv.Library( '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', + '$BUILD_DIR/mongo/db/exec/sort_executor', '$BUILD_DIR/mongo/db/generic_cursor', '$BUILD_DIR/mongo/db/index/key_generator', '$BUILD_DIR/mongo/db/logical_session_cache', diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index da86580ef02..ecb3bf08007 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -397,7 +397,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) { const int numBuckets = 2; auto bucketAutoStage = DocumentSourceBucketAuto::create( expCtx, groupByExpression, numBuckets, {}, nullptr, maxMemoryUsageBytes); - auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes); string largeStr(maxMemoryUsageBytes, 'x'); auto mock = diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 670f6165277..af89701d885 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -96,28 +96,27 @@ Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) { return Value{std::move(keys)}; } -/** - * Generates a new file name on each call using a static, atomic and monotonically increasing - * number. - * - * Each user of the Sorter must implement this function to ensure that all temporary files that the - * Sorter instances produce are uniquely identified using a unique file name extension with separate - * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple - * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. - */ -std::string nextFileName() { - static AtomicWord<unsigned> documentSourceSortFileCounter; - return "extsort-doc-source-sort." + - std::to_string(documentSourceSortFileCounter.fetchAndAdd(1)); -} - } // namespace constexpr StringData DocumentSourceSort::kStageName; -DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx, - const BSONObj& sortOrder) - : DocumentSource(pExpCtx), _rawSort(sortOrder), _sortPattern({_rawSort, pExpCtx}) {} +DocumentSourceSort::DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const BSONObj& sortOrder, + uint64_t limit, + uint64_t maxMemoryUsageBytes) + : DocumentSource(pExpCtx), + _sortExecutor({{sortOrder, pExpCtx}, + limit, + maxMemoryUsageBytes, + pExpCtx->tempDir, + pExpCtx->allowDiskUse}), + // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort + // by a metadata field. + _sortKeyGen({sortOrder, pExpCtx->getCollator()}) { + uassert(15976, + "$sort stage must have at least one sort key", + !_sortExecutor->sortPattern().empty()); +} REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, @@ -134,39 +133,37 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() { invariant(populationResult.isEOF()); } - if (!_output || !_output->more()) { - dispose(); + auto result = _sortExecutor->getNext(); + if (!result) return GetNextResult::makeEOF(); - } - - return _output->next().second; + return GetNextResult(std::move(*result)); } void DocumentSourceSort::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { + uint64_t limit = _sortExecutor->getLimit(); if (explain) { // always one Value for combined $sort + $limit array.push_back(Value(DOC( - kStageName << DOC( - "sortKey" << _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain) - << "limit" - << (_limitSrc ? Value(_limitSrc->getLimit()) : Value()))))); + kStageName << DOC("sortKey" + << _sortExecutor->sortPattern().serialize( + SortPattern::SortKeySerialization::kForExplain) + << "limit" + << (_sortExecutor->hasLimit() ? Value(static_cast<long long>(limit)) + : Value()))))); } else { // one Value for $sort and maybe a Value for $limit - MutableDocument inner( - _sortPattern.serialize(SortPattern::SortKeySerialization::kForPipelineSerialization)); + MutableDocument inner(_sortExecutor->sortPattern().serialize( + SortPattern::SortKeySerialization::kForPipelineSerialization)); array.push_back(Value(DOC(kStageName << inner.freeze()))); - if (_limitSrc) { - _limitSrc->serializeToArray(array); + if (_sortExecutor->hasLimit()) { + auto limitSrc = DocumentSourceLimit::create(pExpCtx, limit); + limitSrc->serializeToArray(array); } } } -void DocumentSourceSort::doDispose() { - _output.reset(); -} - long long DocumentSourceSort::getLimit() const { - return _limitSrc ? _limitSrc->getLimit() : -1; + return _sortExecutor->hasLimit() ? _sortExecutor->getLimit() : -1; } Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( @@ -182,14 +179,13 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( int64_t safeSum = 0; // The skip and limit values can be very large, so we need to make sure the sum doesn't - // overflow before applying an optimiztion to pull the limit into the sort stage. + // overflow before applying an optimization to pull the limit into the sort stage. if (nextSkip && !mongoSignedAddOverflow64(skipSum, nextSkip->getSkip(), &safeSum)) { skipSum = safeSum; ++stageItr; } else if (nextLimit && !mongoSignedAddOverflow64(nextLimit->getLimit(), skipSum, &safeSum)) { - nextLimit->setLimit(safeSum); - setLimitSrc(nextLimit); + _sortExecutor->setLimit(safeSum); container->erase(stageItr); stageItr = std::next(itr); skipSum = 0; @@ -204,7 +200,7 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( } DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const { - for (auto&& keyPart : _sortPattern) { + for (auto&& keyPart : _sortExecutor->sortPattern()) { if (keyPart.expression) { keyPart.expression->addDependencies(deps); } else { @@ -229,46 +225,17 @@ intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson( intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( const intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, - long long limit, + uint64_t limit, boost::optional<uint64_t> maxMemoryUsageBytes) { - intrusive_ptr<DocumentSourceSort> pSort(new DocumentSourceSort(pExpCtx, sortOrder.getOwned())); - pSort->_maxMemoryUsageBytes = maxMemoryUsageBytes + auto resolvedMaxBytes = maxMemoryUsageBytes ? *maxMemoryUsageBytes : internalDocumentSourceSortMaxBlockingSortBytes.load(); - - uassert(15976, "$sort stage must have at least one sort key", !pSort->_sortPattern.empty()); - - pSort->_sortKeyGen = SortKeyGenerator{ - // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort - // by a metadata field. - pSort->_sortPattern.serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) - .toBson(), - pExpCtx->getCollator()}; - - if (limit > 0) { - pSort->setLimitSrc(DocumentSourceLimit::create(pExpCtx, limit)); - } + intrusive_ptr<DocumentSourceSort> pSort( + new DocumentSourceSort(pExpCtx, sortOrder.getOwned(), limit, resolvedMaxBytes)); return pSort; } -SortOptions DocumentSourceSort::makeSortOptions() const { - /* make sure we've got a sort key */ - verify(_sortPattern.size()); - - SortOptions opts; - if (_limitSrc) - opts.limit = _limitSrc->getLimit(); - - opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) { - opts.extSortAllowed = true; - opts.tempDir = pExpCtx->tempDir; - } - - return opts; -} - DocumentSource::GetNextResult DocumentSourceSort::populate() { auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { @@ -282,9 +249,6 @@ DocumentSource::GetNextResult DocumentSourceSort::populate() { void DocumentSourceSort::loadDocument(Document&& doc) { invariant(!_populated); - if (!_sorter) { - _sorter.reset(MySorter::make(makeSortOptions(), Comparator(*this))); - } Value sortKey; Document docForSorter; @@ -292,21 +256,16 @@ void DocumentSourceSort::loadDocument(Document&& doc) { // already computed the sort key we'd have split the pipeline there, would be merging presorted // documents, and wouldn't use this method. std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc)); - _sorter->add(sortKey, docForSorter); + _sortExecutor->add(sortKey, docForSorter); } void DocumentSourceSort::loadingDone() { - if (!_sorter) { - _sorter.reset(MySorter::make(makeSortOptions(), Comparator(*this))); - } - _output.reset(_sorter->done()); - _usedDisk = _sorter->usedDisk() || _usedDisk; - _sorter.reset(); + _sortExecutor->loadingDone(); _populated = true; } bool DocumentSourceSort::usedDisk() { - return _usedDisk; + return _sortExecutor->wasDiskUsed(); } Value DocumentSourceSort::getCollationComparisonKey(const Value& val) const { @@ -358,13 +317,13 @@ StatusWith<Value> DocumentSourceSort::extractKeyPart( } StatusWith<Value> DocumentSourceSort::extractKeyFast(const Document& doc) const { - if (_sortPattern.size() == 1u) { - return extractKeyPart(doc, _sortPattern[0]); + if (_sortExecutor->sortPattern().size() == 1u) { + return extractKeyPart(doc, _sortExecutor->sortPattern()[0]); } vector<Value> keys; - keys.reserve(_sortPattern.size()); - for (auto&& keyPart : _sortPattern) { + keys.reserve(_sortExecutor->sortPattern().size()); + for (auto&& keyPart : _sortExecutor->sortPattern()) { auto extractedKey = extractKeyPart(doc, keyPart); if (!extractedKey.isOK()) { // We can't use the fast path, so bail out. @@ -387,7 +346,7 @@ BSONObj DocumentSourceSort::extractKeyWithArray(const Document& doc) const { // Convert the Document to a BSONObj, but only do the conversion for the paths we actually need. // Then run the result through the SortKeyGenerator to obtain the final sort key. - auto bsonDoc = _sortPattern.documentToBsonWithSortPaths(doc); + auto bsonDoc = _sortExecutor->sortPattern().documentToBsonWithSortPaths(doc); return uassertStatusOK(_sortKeyGen->getSortKey(std::move(bsonDoc), &metadata)); } @@ -403,7 +362,8 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co if (fastKey.isOK()) { inMemorySortKey = std::move(fastKey.getValue()); if (pExpCtx->needsMerge) { - serializedSortKey = serializeSortKey(_sortPattern.size(), inMemorySortKey); + serializedSortKey = + serializeSortKey(_sortExecutor->sortPattern().size(), inMemorySortKey); } } else { // We have to do it the slow way - through the sort key generator. This will generate a BSON @@ -411,7 +371,8 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co // representation into the corresponding array of keys as a Value. BSONObj {'': 1, '': [2, // 3]} becomes Value [1, [2, 3]]. serializedSortKey = extractKeyWithArray(doc); - inMemorySortKey = deserializeSortKey(_sortPattern.size(), *serializedSortKey); + inMemorySortKey = + deserializeSortKey(_sortExecutor->sortPattern().size(), *serializedSortKey); } MutableDocument toBeSorted(std::move(doc)); @@ -424,48 +385,14 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co return {inMemorySortKey, toBeSorted.freeze()}; } -int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { - // DocumentSourceSort::populate() has already guaranteed that the sort key is non-empty. - // However, the tricky part is deciding what to do if none of the sort keys are present. In that - // case, consider the document "less". - // - // Note that 'comparator' must use binary comparisons here, as both 'lhs' and 'rhs' are - // collation comparison keys. - ValueComparator comparator; - const size_t n = _sortPattern.size(); - if (n == 1) { // simple fast case - if (_sortPattern[0].isAscending) - return comparator.compare(lhs, rhs); - else - return -comparator.compare(lhs, rhs); - } - - // compound sort - for (size_t i = 0; i < n; i++) { - int cmp = comparator.compare(lhs[i], rhs[i]); - if (cmp) { - /* if necessary, adjust the return value by the key ordering */ - if (!_sortPattern[i].isAscending) - cmp = -cmp; - - return cmp; - } - } - - /* - If we got here, everything matched (or didn't exist), so we'll - consider the documents equal for purposes of this sort. - */ - return 0; -} - boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() { DistributedPlanLogic split; split.shardsStage = this; - split.inputSortPattern = - _sortPattern.serialize(SortPattern::SortKeySerialization::kForSortKeyMerging).toBson(); - if (_limitSrc) { - split.mergingStage = DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()); + split.inputSortPattern = _sortExecutor->sortPattern() + .serialize(SortPattern::SortKeySerialization::kForSortKeyMerging) + .toBson(); + if (_sortExecutor->hasLimit()) { + split.mergingStage = DocumentSourceLimit::create(pExpCtx, getLimit()); } return split; } @@ -482,6 +409,3 @@ bool DocumentSourceSort::canRunInParallelBeforeWriteStage( } } // namespace mongo - -#include "mongo/db/sorter/sorter.cpp" -// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 519b61a65e8..44797e3dcf3 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/exec/sort_executor.h" #include "mongo/db/index/sort_key_generator.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_limit.h" @@ -69,7 +70,7 @@ public: ChangeStreamRequirement::kBlacklist); // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. - constraints.canSwapWithMatch = !_limitSrc; + constraints.canSwapWithMatch = !_sortExecutor->hasLimit(); return constraints; } @@ -83,7 +84,7 @@ public: * Returns the sort key pattern. */ const SortPattern& getSortKeyPattern() const { - return _sortPattern; + return _sortExecutor->sortPattern(); } /** @@ -99,7 +100,7 @@ public: static boost::intrusive_ptr<DocumentSourceSort> create( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj sortOrder, - long long limit = -1, + uint64_t limit = 0, boost::optional<uint64_t> maxMemoryUsageBytes = boost::none); /** @@ -125,45 +126,26 @@ public: */ bool usedDisk() final; - /** - * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that - * have already been sorted. - */ - void populateFromCursors(const std::vector<DBClientCursor*>& cursors); - bool isPopulated() { return _populated; }; - boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const { - return _limitSrc; + bool hasLimit() const { + return _sortExecutor->hasLimit(); } protected: /** - * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort. + * Attempts to absorb a subsequent $limit stage so that it can perform a top-k sort. */ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; - void doDispose() final; private: - using MySorter = Sorter<Value, Document>; - - // For MySorter. - class Comparator { - public: - explicit Comparator(const DocumentSourceSort& source) : _source(source) {} - int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const { - return _source.compare(lhs.first, rhs.first); - } - - private: - const DocumentSourceSort& _source; - }; - - explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - const BSONObj& sortOrder); + DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const BSONObj& sortOrder, + uint64_t limit, + uint64_t maxMemoryUsageBytes); Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { MONGO_UNREACHABLE; // Should call serializeToArray instead. @@ -179,8 +161,6 @@ private: */ GetNextResult populate(); - SortOptions makeSortOptions() const; - /** * Returns the sort key for 'doc', as well as the document that should be entered into the * sorter to eventually be returned. If we will need to later merge the sorted results with @@ -218,33 +198,11 @@ private: */ Value getCollationComparisonKey(const Value& val) const; - int compare(const Value& lhs, const Value& rhs) const; - - /** - * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep - * the smallest limit. - */ - void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) { - if (!_limitSrc || limit->getLimit() < _limitSrc->getLimit()) { - _limitSrc = limit; - } - } - bool _populated = false; - BSONObj _rawSort; + boost::optional<SortExecutor> _sortExecutor; boost::optional<SortKeyGenerator> _sortKeyGen; - - SortPattern _sortPattern; - - boost::intrusive_ptr<DocumentSourceLimit> _limitSrc; - - uint64_t _maxMemoryUsageBytes; - bool _done; - std::unique_ptr<MySorter> _sorter; - std::unique_ptr<MySorter::Iterator> _output; - bool _usedDisk = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index b73d90a2d1a..00a4dad3c70 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -403,7 +403,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; - auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes); string largeStr(maxMemoryUsageBytes, 'x'); auto mock = @@ -438,7 +438,7 @@ TEST_F(DocumentSourceSortExecutionTest, expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; - auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes); string largeStr(maxMemoryUsageBytes, 'x'); auto mock = DocumentSourceMock::createForTest({Document{{"_id", 0}, {"largeStr", largeStr}}, @@ -453,7 +453,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPa expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; - auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); + auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes); string largeStr(maxMemoryUsageBytes / 2, 'x'); auto mock = diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 3a27c6ee73f..d6e3557bcd9 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -414,7 +414,7 @@ getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) { if (sourcesIt != sources.end()) { sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get()); if (sortStage) { - if (!sortStage->getLimitSrc()) { + if (!sortStage->hasLimit()) { ++sourcesIt; } else { // This $sort stage was previously followed by a $limit stage. @@ -664,7 +664,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep if (swExecutorGrouped.isOK()) { // Any $limit stage before the $group stage should make the pipeline ineligible for this // optimization. - invariant(!sortStage || !sortStage->getLimitSrc()); + invariant(!sortStage || !sortStage->hasLimit()); // We remove the $sort and $group stages that begin the pipeline, because the executor // will handle the sort, and the groupTransform (added below) will handle the $group @@ -751,9 +751,10 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // We know the sort is being handled by the query system, so remove the $sort stage. pipeline->_sources.pop_front(); - if (sortStage->getLimitSrc()) { + if (sortStage->hasLimit()) { // We need to reinsert the coalesced $limit after removing the $sort. - pipeline->_sources.push_front(sortStage->getLimitSrc()); + pipeline->_sources.push_front( + DocumentSourceLimit::create(expCtx, sortStage->getLimit())); } return std::move(exec); } else if (swExecutorSort == ErrorCodes::QueryPlanKilled) { diff --git a/src/mongo/db/query/sort_pattern.cpp b/src/mongo/db/query/sort_pattern.cpp index 506ee856805..3d1fe685d04 100644 --- a/src/mongo/db/query/sort_pattern.cpp +++ b/src/mongo/db/query/sort_pattern.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/query/sort_pattern.h" namespace mongo { diff --git a/src/mongo/db/query/sort_pattern.h b/src/mongo/db/query/sort_pattern.h index a8d57efd114..aa6cd2a1564 100644 --- a/src/mongo/db/query/sort_pattern.h +++ b/src/mongo/db/query/sort_pattern.h @@ -33,10 +33,8 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/expression.h" -#include "mongo/db/pipeline/field_path.h" namespace mongo { - class SortPattern { public: enum class SortKeySerialization { |