summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMisha Ivkov <misha.ivkov@10gen.com>2019-07-02 18:18:37 -0400
committerMisha Ivkov <misha.ivkov@10gen.com>2019-07-23 16:28:11 -0400
commita8e3a6317dc2579b90523fc8b47fd535db776939 (patch)
tree6b8fec33d63c2feb6b1e691cbc26c6d965d228cd
parent5020e611bbffd51bb8cebd3790f8537b7eb4c03c (diff)
downloadmongo-a8e3a6317dc2579b90523fc8b47fd535db776939.tar.gz
SERVER-41960 Refactor DocumentSourceSort logic into SortExecutor
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/exec/SConscript16
-rw-r--r--src/mongo/db/exec/sort_executor.cpp149
-rw-r--r--src/mongo/db/exec/sort_executor.h118
-rw-r--r--src/mongo/db/pipeline/SConscript7
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp190
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h66
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp9
-rw-r--r--src/mongo/db/query/sort_pattern.cpp2
-rw-r--r--src/mongo/db/query/sort_pattern.h2
12 files changed, 368 insertions, 200 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index e52b3d792ea..b4721e6a2d3 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1036,6 +1036,7 @@ env.Library(
'db_raii',
'dbdirectclient',
'exec/scoped_timer',
+ 'exec/sort_executor',
'exec/working_set',
'fts/base_fts',
'index/index_descriptor',
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index a790d0e21a1..7ee2806711e 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -28,6 +28,22 @@ env.Library(
],
)
+sortExecutorEnv = env.Clone()
+sortExecutorEnv.InjectThirdParty(libraries=['snappy'])
+sortExecutorEnv.Library(
+ target="sort_executor",
+ source=[
+ "sort_executor.cpp",
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/query/sort_pattern',
+ '$BUILD_DIR/mongo/db/storage/encryption_hooks',
+ '$BUILD_DIR/mongo/db/storage/storage_options',
+ '$BUILD_DIR/mongo/s/is_mongos',
+ '$BUILD_DIR/third_party/shim_snappy',
+ ],
+)
+
env.Library(
target='stagedebug_cmd',
source=[
diff --git a/src/mongo/db/exec/sort_executor.cpp b/src/mongo/db/exec/sort_executor.cpp
new file mode 100644
index 00000000000..8fd20cfda46
--- /dev/null
+++ b/src/mongo/db/exec/sort_executor.cpp
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/sort_executor.h"
+#include "mongo/db/pipeline/value_comparator.h"
+
+namespace mongo {
+namespace {
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicWord<unsigned> sortExecutorFileCounter;
+ return "extsort-sort-executor." + std::to_string(sortExecutorFileCounter.fetchAndAdd(1));
+}
+} // namespace
+
+SortExecutor::SortExecutor(SortPattern sortPattern,
+ uint64_t limit,
+ uint64_t maxMemoryUsageBytes,
+ std::string tempDir,
+ bool allowDiskUse)
+ : _sortPattern(std::move(sortPattern)),
+ _limit(limit),
+ _maxMemoryUsageBytes(maxMemoryUsageBytes),
+ _tempDir(std::move(tempDir)),
+ _diskUseAllowed(allowDiskUse) {}
+
+int SortExecutor::Comparator::operator()(const DocumentSorter::Data& lhs,
+ const DocumentSorter::Data& rhs) const {
+ Value lhsKey = lhs.first;
+ Value rhsKey = rhs.first;
+ // DocumentSourceSort::populate() has already guaranteed that the sort key is non-empty.
+ // However, the tricky part is deciding what to do if none of the sort keys are present. In that
+ // case, consider the document "less".
+ //
+ // Note that 'comparator' must use binary comparisons here, as both 'lhs' and 'rhs' are
+ // collation comparison keys.
+ ValueComparator comparator;
+ const size_t n = _sort.size();
+ if (n == 1) { // simple fast case
+ if (_sort[0].isAscending)
+ return comparator.compare(lhsKey, rhsKey);
+ else
+ return -comparator.compare(lhsKey, rhsKey);
+ }
+
+ // compound sort
+ for (size_t i = 0; i < n; i++) {
+ int cmp = comparator.compare(lhsKey[i], rhsKey[i]);
+ if (cmp) {
+ /* if necessary, adjust the return value by the key ordering */
+ if (!_sort[i].isAscending)
+ cmp = -cmp;
+
+ return cmp;
+ }
+ }
+
+ /*
+ If we got here, everything matched (or didn't exist), so we'll
+ consider the documents equal for purposes of this sort.
+ */
+ return 0;
+}
+
+boost::optional<Document> SortExecutor::getNext() {
+ if (_isEOF) {
+ return boost::none;
+ }
+
+ if (!_output->more()) {
+ _output.reset();
+ _isEOF = true;
+ return boost::none;
+ }
+
+ return _output->next().second;
+}
+
+void SortExecutor::add(Value sortKey, Document data) {
+ if (!_sorter) {
+ _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
+ }
+ _sorter->add(std::move(sortKey), std::move(data));
+}
+
+void SortExecutor::loadingDone() {
+ // This conditional should only pass if no documents were added to the sorter.
+ if (!_sorter) {
+ _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
+ }
+ _output.reset(_sorter->done());
+ _wasDiskUsed = _wasDiskUsed || _sorter->usedDisk();
+ _sorter.reset();
+}
+
+SortOptions SortExecutor::makeSortOptions() const {
+ SortOptions opts;
+ if (_limit) {
+ opts.limit = _limit;
+ }
+
+ opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
+ if (_diskUseAllowed) {
+ opts.extSortAllowed = true;
+ opts.tempDir = _tempDir;
+ }
+
+ return opts;
+}
+} // namespace mongo
+
+#include "mongo/db/sorter/sorter.cpp"
+// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h
new file mode 100644
index 00000000000..95741f41256
--- /dev/null
+++ b/src/mongo/db/exec/sort_executor.h
@@ -0,0 +1,118 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/query/sort_pattern.h"
+#include "mongo/db/sorter/sorter.h"
+
+namespace mongo {
+/**
+ * The SortExecutor class is the internal implementation of sorting for query execution. The
+ * caller should provide input documents by repeated calls to the add() function, and then
+ * complete the loading process with a single call to loadingDone(). Finally, getNext() should be
+ * called to return the documents one by one in sorted order.
+ */
+class SortExecutor {
+public:
+ /**
+ * If the passed in limit is 0, this is treated as no limit.
+ */
+ SortExecutor(SortPattern sortPattern,
+ uint64_t limit,
+ uint64_t maxMemoryUsageBytes,
+ std::string tempDir,
+ bool allowDiskUse);
+
+ boost::optional<Document> getNext();
+
+ const SortPattern& sortPattern() const {
+ return _sortPattern;
+ }
+
+ /**
+ * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep
+ * the smallest limit.
+ */
+ void setLimit(uint64_t limit) {
+ if (!_limit || limit < _limit)
+ _limit = limit;
+ }
+
+ uint64_t getLimit() const {
+ return _limit;
+ }
+
+ bool hasLimit() const {
+ return _limit > 0;
+ }
+
+ bool wasDiskUsed() const {
+ return _wasDiskUsed;
+ }
+
+ /**
+ * Signals to the sort executor that there will be no more input documents.
+ */
+ void loadingDone();
+
+ /**
+ * Add a Document with sort key specified by Value to the DocumentSorter.
+ */
+ void add(Value, Document);
+
+private:
+ using DocumentSorter = Sorter<Value, Document>;
+ class Comparator {
+ public:
+ Comparator(const SortPattern& sortPattern) : _sort(sortPattern) {}
+ int operator()(const DocumentSorter::Data& lhs, const DocumentSorter::Data& rhs) const;
+
+ private:
+ const SortPattern& _sort;
+ };
+
+ SortOptions makeSortOptions() const;
+
+ SortPattern _sortPattern;
+ // A limit of zero is defined as no limit.
+ uint64_t _limit;
+ uint64_t _maxMemoryUsageBytes;
+ std::string _tempDir;
+ bool _diskUseAllowed = false;
+
+ std::unique_ptr<DocumentSorter> _sorter;
+ std::unique_ptr<DocumentSorter::Iterator> _output;
+
+ bool _isEOF = false;
+ bool _wasDiskUsed = false;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 984bdbdf89c..2d71d9dfadc 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -269,9 +269,9 @@ env.Library(
],
)
-pipelineeEnv = env.Clone()
-pipelineeEnv.InjectThirdParty(libraries=['snappy'])
-pipelineeEnv.Library(
+pipelineEnv = env.Clone()
+pipelineEnv.InjectThirdParty(libraries=['snappy'])
+pipelineEnv.Library(
target='pipeline',
source=[
'document_source.cpp',
@@ -330,6 +330,7 @@ pipelineeEnv.Library(
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
+ '$BUILD_DIR/mongo/db/exec/sort_executor',
'$BUILD_DIR/mongo/db/generic_cursor',
'$BUILD_DIR/mongo/db/index/key_generator',
'$BUILD_DIR/mongo/db/logical_session_cache',
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index da86580ef02..ecb3bf08007 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -397,7 +397,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) {
const int numBuckets = 2;
auto bucketAutoStage = DocumentSourceBucketAuto::create(
expCtx, groupByExpression, numBuckets, {}, nullptr, maxMemoryUsageBytes);
- auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes);
string largeStr(maxMemoryUsageBytes, 'x');
auto mock =
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 670f6165277..af89701d885 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -96,28 +96,27 @@ Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) {
return Value{std::move(keys)};
}
-/**
- * Generates a new file name on each call using a static, atomic and monotonically increasing
- * number.
- *
- * Each user of the Sorter must implement this function to ensure that all temporary files that the
- * Sorter instances produce are uniquely identified using a unique file name extension with separate
- * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
- * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
- */
-std::string nextFileName() {
- static AtomicWord<unsigned> documentSourceSortFileCounter;
- return "extsort-doc-source-sort." +
- std::to_string(documentSourceSortFileCounter.fetchAndAdd(1));
-}
-
} // namespace
constexpr StringData DocumentSourceSort::kStageName;
-DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx,
- const BSONObj& sortOrder)
- : DocumentSource(pExpCtx), _rawSort(sortOrder), _sortPattern({_rawSort, pExpCtx}) {}
+DocumentSourceSort::DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const BSONObj& sortOrder,
+ uint64_t limit,
+ uint64_t maxMemoryUsageBytes)
+ : DocumentSource(pExpCtx),
+ _sortExecutor({{sortOrder, pExpCtx},
+ limit,
+ maxMemoryUsageBytes,
+ pExpCtx->tempDir,
+ pExpCtx->allowDiskUse}),
+ // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort
+ // by a metadata field.
+ _sortKeyGen({sortOrder, pExpCtx->getCollator()}) {
+ uassert(15976,
+ "$sort stage must have at least one sort key",
+ !_sortExecutor->sortPattern().empty());
+}
REGISTER_DOCUMENT_SOURCE(sort,
LiteParsedDocumentSourceDefault::parse,
@@ -134,39 +133,37 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() {
invariant(populationResult.isEOF());
}
- if (!_output || !_output->more()) {
- dispose();
+ auto result = _sortExecutor->getNext();
+ if (!result)
return GetNextResult::makeEOF();
- }
-
- return _output->next().second;
+ return GetNextResult(std::move(*result));
}
void DocumentSourceSort::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+ uint64_t limit = _sortExecutor->getLimit();
if (explain) { // always one Value for combined $sort + $limit
array.push_back(Value(DOC(
- kStageName << DOC(
- "sortKey" << _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain)
- << "limit"
- << (_limitSrc ? Value(_limitSrc->getLimit()) : Value())))));
+ kStageName << DOC("sortKey"
+ << _sortExecutor->sortPattern().serialize(
+ SortPattern::SortKeySerialization::kForExplain)
+ << "limit"
+ << (_sortExecutor->hasLimit() ? Value(static_cast<long long>(limit))
+ : Value())))));
} else { // one Value for $sort and maybe a Value for $limit
- MutableDocument inner(
- _sortPattern.serialize(SortPattern::SortKeySerialization::kForPipelineSerialization));
+ MutableDocument inner(_sortExecutor->sortPattern().serialize(
+ SortPattern::SortKeySerialization::kForPipelineSerialization));
array.push_back(Value(DOC(kStageName << inner.freeze())));
- if (_limitSrc) {
- _limitSrc->serializeToArray(array);
+ if (_sortExecutor->hasLimit()) {
+ auto limitSrc = DocumentSourceLimit::create(pExpCtx, limit);
+ limitSrc->serializeToArray(array);
}
}
}
-void DocumentSourceSort::doDispose() {
- _output.reset();
-}
-
long long DocumentSourceSort::getLimit() const {
- return _limitSrc ? _limitSrc->getLimit() : -1;
+ return _sortExecutor->hasLimit() ? _sortExecutor->getLimit() : -1;
}
Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
@@ -182,14 +179,13 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
int64_t safeSum = 0;
// The skip and limit values can be very large, so we need to make sure the sum doesn't
- // overflow before applying an optimiztion to pull the limit into the sort stage.
+ // overflow before applying an optimization to pull the limit into the sort stage.
if (nextSkip && !mongoSignedAddOverflow64(skipSum, nextSkip->getSkip(), &safeSum)) {
skipSum = safeSum;
++stageItr;
} else if (nextLimit &&
!mongoSignedAddOverflow64(nextLimit->getLimit(), skipSum, &safeSum)) {
- nextLimit->setLimit(safeSum);
- setLimitSrc(nextLimit);
+ _sortExecutor->setLimit(safeSum);
container->erase(stageItr);
stageItr = std::next(itr);
skipSum = 0;
@@ -204,7 +200,7 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
}
DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const {
- for (auto&& keyPart : _sortPattern) {
+ for (auto&& keyPart : _sortExecutor->sortPattern()) {
if (keyPart.expression) {
keyPart.expression->addDependencies(deps);
} else {
@@ -229,46 +225,17 @@ intrusive_ptr<DocumentSource> DocumentSourceSort::createFromBson(
intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
const intrusive_ptr<ExpressionContext>& pExpCtx,
BSONObj sortOrder,
- long long limit,
+ uint64_t limit,
boost::optional<uint64_t> maxMemoryUsageBytes) {
- intrusive_ptr<DocumentSourceSort> pSort(new DocumentSourceSort(pExpCtx, sortOrder.getOwned()));
- pSort->_maxMemoryUsageBytes = maxMemoryUsageBytes
+ auto resolvedMaxBytes = maxMemoryUsageBytes
? *maxMemoryUsageBytes
: internalDocumentSourceSortMaxBlockingSortBytes.load();
-
- uassert(15976, "$sort stage must have at least one sort key", !pSort->_sortPattern.empty());
-
- pSort->_sortKeyGen = SortKeyGenerator{
- // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort
- // by a metadata field.
- pSort->_sortPattern.serialize(SortPattern::SortKeySerialization::kForPipelineSerialization)
- .toBson(),
- pExpCtx->getCollator()};
-
- if (limit > 0) {
- pSort->setLimitSrc(DocumentSourceLimit::create(pExpCtx, limit));
- }
+ intrusive_ptr<DocumentSourceSort> pSort(
+ new DocumentSourceSort(pExpCtx, sortOrder.getOwned(), limit, resolvedMaxBytes));
return pSort;
}
-SortOptions DocumentSourceSort::makeSortOptions() const {
- /* make sure we've got a sort key */
- verify(_sortPattern.size());
-
- SortOptions opts;
- if (_limitSrc)
- opts.limit = _limitSrc->getLimit();
-
- opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
- opts.extSortAllowed = true;
- opts.tempDir = pExpCtx->tempDir;
- }
-
- return opts;
-}
-
DocumentSource::GetNextResult DocumentSourceSort::populate() {
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
@@ -282,9 +249,6 @@ DocumentSource::GetNextResult DocumentSourceSort::populate() {
void DocumentSourceSort::loadDocument(Document&& doc) {
invariant(!_populated);
- if (!_sorter) {
- _sorter.reset(MySorter::make(makeSortOptions(), Comparator(*this)));
- }
Value sortKey;
Document docForSorter;
@@ -292,21 +256,16 @@ void DocumentSourceSort::loadDocument(Document&& doc) {
// already computed the sort key we'd have split the pipeline there, would be merging presorted
// documents, and wouldn't use this method.
std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc));
- _sorter->add(sortKey, docForSorter);
+ _sortExecutor->add(sortKey, docForSorter);
}
void DocumentSourceSort::loadingDone() {
- if (!_sorter) {
- _sorter.reset(MySorter::make(makeSortOptions(), Comparator(*this)));
- }
- _output.reset(_sorter->done());
- _usedDisk = _sorter->usedDisk() || _usedDisk;
- _sorter.reset();
+ _sortExecutor->loadingDone();
_populated = true;
}
bool DocumentSourceSort::usedDisk() {
- return _usedDisk;
+ return _sortExecutor->wasDiskUsed();
}
Value DocumentSourceSort::getCollationComparisonKey(const Value& val) const {
@@ -358,13 +317,13 @@ StatusWith<Value> DocumentSourceSort::extractKeyPart(
}
StatusWith<Value> DocumentSourceSort::extractKeyFast(const Document& doc) const {
- if (_sortPattern.size() == 1u) {
- return extractKeyPart(doc, _sortPattern[0]);
+ if (_sortExecutor->sortPattern().size() == 1u) {
+ return extractKeyPart(doc, _sortExecutor->sortPattern()[0]);
}
vector<Value> keys;
- keys.reserve(_sortPattern.size());
- for (auto&& keyPart : _sortPattern) {
+ keys.reserve(_sortExecutor->sortPattern().size());
+ for (auto&& keyPart : _sortExecutor->sortPattern()) {
auto extractedKey = extractKeyPart(doc, keyPart);
if (!extractedKey.isOK()) {
// We can't use the fast path, so bail out.
@@ -387,7 +346,7 @@ BSONObj DocumentSourceSort::extractKeyWithArray(const Document& doc) const {
// Convert the Document to a BSONObj, but only do the conversion for the paths we actually need.
// Then run the result through the SortKeyGenerator to obtain the final sort key.
- auto bsonDoc = _sortPattern.documentToBsonWithSortPaths(doc);
+ auto bsonDoc = _sortExecutor->sortPattern().documentToBsonWithSortPaths(doc);
return uassertStatusOK(_sortKeyGen->getSortKey(std::move(bsonDoc), &metadata));
}
@@ -403,7 +362,8 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
if (fastKey.isOK()) {
inMemorySortKey = std::move(fastKey.getValue());
if (pExpCtx->needsMerge) {
- serializedSortKey = serializeSortKey(_sortPattern.size(), inMemorySortKey);
+ serializedSortKey =
+ serializeSortKey(_sortExecutor->sortPattern().size(), inMemorySortKey);
}
} else {
// We have to do it the slow way - through the sort key generator. This will generate a BSON
@@ -411,7 +371,8 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
// representation into the corresponding array of keys as a Value. BSONObj {'': 1, '': [2,
// 3]} becomes Value [1, [2, 3]].
serializedSortKey = extractKeyWithArray(doc);
- inMemorySortKey = deserializeSortKey(_sortPattern.size(), *serializedSortKey);
+ inMemorySortKey =
+ deserializeSortKey(_sortExecutor->sortPattern().size(), *serializedSortKey);
}
MutableDocument toBeSorted(std::move(doc));
@@ -424,48 +385,14 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
return {inMemorySortKey, toBeSorted.freeze()};
}
-int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const {
- // DocumentSourceSort::populate() has already guaranteed that the sort key is non-empty.
- // However, the tricky part is deciding what to do if none of the sort keys are present. In that
- // case, consider the document "less".
- //
- // Note that 'comparator' must use binary comparisons here, as both 'lhs' and 'rhs' are
- // collation comparison keys.
- ValueComparator comparator;
- const size_t n = _sortPattern.size();
- if (n == 1) { // simple fast case
- if (_sortPattern[0].isAscending)
- return comparator.compare(lhs, rhs);
- else
- return -comparator.compare(lhs, rhs);
- }
-
- // compound sort
- for (size_t i = 0; i < n; i++) {
- int cmp = comparator.compare(lhs[i], rhs[i]);
- if (cmp) {
- /* if necessary, adjust the return value by the key ordering */
- if (!_sortPattern[i].isAscending)
- cmp = -cmp;
-
- return cmp;
- }
- }
-
- /*
- If we got here, everything matched (or didn't exist), so we'll
- consider the documents equal for purposes of this sort.
- */
- return 0;
-}
-
boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() {
DistributedPlanLogic split;
split.shardsStage = this;
- split.inputSortPattern =
- _sortPattern.serialize(SortPattern::SortKeySerialization::kForSortKeyMerging).toBson();
- if (_limitSrc) {
- split.mergingStage = DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit());
+ split.inputSortPattern = _sortExecutor->sortPattern()
+ .serialize(SortPattern::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ if (_sortExecutor->hasLimit()) {
+ split.mergingStage = DocumentSourceLimit::create(pExpCtx, getLimit());
}
return split;
}
@@ -482,6 +409,3 @@ bool DocumentSourceSort::canRunInParallelBeforeWriteStage(
}
} // namespace mongo
-
-#include "mongo/db/sorter/sorter.cpp"
-// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 519b61a65e8..44797e3dcf3 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/exec/sort_executor.h"
#include "mongo/db/index/sort_key_generator.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_limit.h"
@@ -69,7 +70,7 @@ public:
ChangeStreamRequirement::kBlacklist);
// Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit.
- constraints.canSwapWithMatch = !_limitSrc;
+ constraints.canSwapWithMatch = !_sortExecutor->hasLimit();
return constraints;
}
@@ -83,7 +84,7 @@ public:
* Returns the sort key pattern.
*/
const SortPattern& getSortKeyPattern() const {
- return _sortPattern;
+ return _sortExecutor->sortPattern();
}
/**
@@ -99,7 +100,7 @@ public:
static boost::intrusive_ptr<DocumentSourceSort> create(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
BSONObj sortOrder,
- long long limit = -1,
+ uint64_t limit = 0,
boost::optional<uint64_t> maxMemoryUsageBytes = boost::none);
/**
@@ -125,45 +126,26 @@ public:
*/
bool usedDisk() final;
- /**
- * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that
- * have already been sorted.
- */
- void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
-
bool isPopulated() {
return _populated;
};
- boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const {
- return _limitSrc;
+ bool hasLimit() const {
+ return _sortExecutor->hasLimit();
}
protected:
/**
- * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort.
+ * Attempts to absorb a subsequent $limit stage so that it can perform a top-k sort.
*/
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
- void doDispose() final;
private:
- using MySorter = Sorter<Value, Document>;
-
- // For MySorter.
- class Comparator {
- public:
- explicit Comparator(const DocumentSourceSort& source) : _source(source) {}
- int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
- return _source.compare(lhs.first, rhs.first);
- }
-
- private:
- const DocumentSourceSort& _source;
- };
-
- explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- const BSONObj& sortOrder);
+ DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const BSONObj& sortOrder,
+ uint64_t limit,
+ uint64_t maxMemoryUsageBytes);
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
MONGO_UNREACHABLE; // Should call serializeToArray instead.
@@ -179,8 +161,6 @@ private:
*/
GetNextResult populate();
- SortOptions makeSortOptions() const;
-
/**
* Returns the sort key for 'doc', as well as the document that should be entered into the
* sorter to eventually be returned. If we will need to later merge the sorted results with
@@ -218,33 +198,11 @@ private:
*/
Value getCollationComparisonKey(const Value& val) const;
- int compare(const Value& lhs, const Value& rhs) const;
-
- /**
- * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep
- * the smallest limit.
- */
- void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) {
- if (!_limitSrc || limit->getLimit() < _limitSrc->getLimit()) {
- _limitSrc = limit;
- }
- }
-
bool _populated = false;
- BSONObj _rawSort;
+ boost::optional<SortExecutor> _sortExecutor;
boost::optional<SortKeyGenerator> _sortKeyGen;
-
- SortPattern _sortPattern;
-
- boost::intrusive_ptr<DocumentSourceLimit> _limitSrc;
-
- uint64_t _maxMemoryUsageBytes;
- bool _done;
- std::unique_ptr<MySorter> _sorter;
- std::unique_ptr<MySorter::Iterator> _output;
- bool _usedDisk = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index b73d90a2d1a..00a4dad3c70 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -403,7 +403,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled)
expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
- auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes);
string largeStr(maxMemoryUsageBytes, 'x');
auto mock =
@@ -438,7 +438,7 @@ TEST_F(DocumentSourceSortExecutionTest,
expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
- auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes);
string largeStr(maxMemoryUsageBytes, 'x');
auto mock = DocumentSourceMock::createForTest({Document{{"_id", 0}, {"largeStr", largeStr}},
@@ -453,7 +453,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPa
expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
- auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
+ auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), 0, maxMemoryUsageBytes);
string largeStr(maxMemoryUsageBytes / 2, 'x');
auto mock =
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 3a27c6ee73f..d6e3557bcd9 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -414,7 +414,7 @@ getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) {
if (sourcesIt != sources.end()) {
sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get());
if (sortStage) {
- if (!sortStage->getLimitSrc()) {
+ if (!sortStage->hasLimit()) {
++sourcesIt;
} else {
// This $sort stage was previously followed by a $limit stage.
@@ -664,7 +664,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
if (swExecutorGrouped.isOK()) {
// Any $limit stage before the $group stage should make the pipeline ineligible for this
// optimization.
- invariant(!sortStage || !sortStage->getLimitSrc());
+ invariant(!sortStage || !sortStage->hasLimit());
// We remove the $sort and $group stages that begin the pipeline, because the executor
// will handle the sort, and the groupTransform (added below) will handle the $group
@@ -751,9 +751,10 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// We know the sort is being handled by the query system, so remove the $sort stage.
pipeline->_sources.pop_front();
- if (sortStage->getLimitSrc()) {
+ if (sortStage->hasLimit()) {
// We need to reinsert the coalesced $limit after removing the $sort.
- pipeline->_sources.push_front(sortStage->getLimitSrc());
+ pipeline->_sources.push_front(
+ DocumentSourceLimit::create(expCtx, sortStage->getLimit()));
}
return std::move(exec);
} else if (swExecutorSort == ErrorCodes::QueryPlanKilled) {
diff --git a/src/mongo/db/query/sort_pattern.cpp b/src/mongo/db/query/sort_pattern.cpp
index 506ee856805..3d1fe685d04 100644
--- a/src/mongo/db/query/sort_pattern.cpp
+++ b/src/mongo/db/query/sort_pattern.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/query/sort_pattern.h"
namespace mongo {
diff --git a/src/mongo/db/query/sort_pattern.h b/src/mongo/db/query/sort_pattern.h
index a8d57efd114..aa6cd2a1564 100644
--- a/src/mongo/db/query/sort_pattern.h
+++ b/src/mongo/db/query/sort_pattern.h
@@ -33,10 +33,8 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/expression.h"
-#include "mongo/db/pipeline/field_path.h"
namespace mongo {
-
class SortPattern {
public:
enum class SortKeySerialization {