/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_sort.h" #include #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/document_comparator.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/skip_and_limit.h" #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/db/stats/resource_consumption_metrics.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/s/query/document_source_merge_cursors.h" namespace mongo { using boost::intrusive_ptr; using std::make_pair; using std::string; using std::unique_ptr; using std::vector; constexpr StringData DocumentSourceSort::kStageName; DocumentSourceSort::DocumentSourceSort(const boost::intrusive_ptr& pExpCtx, const SortPattern& sortOrder, uint64_t limit, uint64_t maxMemoryUsageBytes) : DocumentSource(kStageName, pExpCtx), _sortExecutor( {sortOrder, limit, maxMemoryUsageBytes, pExpCtx->tempDir, pExpCtx->allowDiskUse}), // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort // by a metadata field. _sortKeyGen({sortOrder, pExpCtx->getCollator()}) { uassert(15976, "$sort stage must have at least one sort key", !_sortExecutor->sortPattern().empty()); } REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, DocumentSourceSort::createFromBson, LiteParsedDocumentSource::AllowedWithApiStrict::kAlways); DocumentSource::GetNextResult DocumentSourceSort::doGetNext() { if (!_populated) { const auto populationResult = populate(); if (populationResult.isPaused()) { return populationResult; } invariant(populationResult.isEOF()); } if (!_sortExecutor->hasNext()) { return GetNextResult::makeEOF(); } return GetNextResult{_sortExecutor->getNext().second}; } void DocumentSourceSort::serializeToArray( std::vector& array, boost::optional explain) const { uint64_t limit = _sortExecutor->getLimit(); if (!explain) { // one Value for $sort and maybe a Value for $limit MutableDocument inner(_sortExecutor->sortPattern().serialize( SortPattern::SortKeySerialization::kForPipelineSerialization)); array.push_back(Value(DOC(kStageName << inner.freeze()))); if (_sortExecutor->hasLimit()) { auto limitSrc = DocumentSourceLimit::create(pExpCtx, limit); limitSrc->serializeToArray(array); } return; } MutableDocument mutDoc( DOC(kStageName << DOC("sortKey" << _sortExecutor->sortPattern().serialize( SortPattern::SortKeySerialization::kForExplain) << "limit" << (_sortExecutor->hasLimit() ? Value(static_cast(limit)) : Value())))); if (explain >= ExplainOptions::Verbosity::kExecStats) { auto& stats = _sortExecutor->stats(); mutDoc["totalDataSizeSortedBytesEstimate"] = Value(static_cast(stats.totalDataSizeBytes)); mutDoc["usedDisk"] = Value(stats.spills > 0 ? true : false); } array.push_back(Value(mutDoc.freeze())); } boost::optional DocumentSourceSort::getLimit() const { return _sortExecutor->hasLimit() ? boost::optional{_sortExecutor->getLimit()} : boost::none; } Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); auto stageItr = std::next(itr); auto limit = extractLimitForPushdown(stageItr, container); if (limit) _sortExecutor->setLimit(*limit); auto nextStage = std::next(itr); if (nextStage == container->end()) { return container->end(); } limit = getLimit(); // Since $sort is not guaranteed to be stable, we can blindly remove the first $sort only when // there's no limit on the current sort. auto nextSort = dynamic_cast((*nextStage).get()); if (!limit && nextSort) { container->erase(itr); return nextStage; } if (limit && nextSort) { // If there's a limit between two adjacent sorts with the same key pattern it's safe to // merge the two sorts and take the minimum of the limits. if (dynamic_cast((*itr).get())->getSortKeyPattern() == nextSort->getSortKeyPattern()) { // When coalescing subsequent $sort stages, the existing/lower limit is retained in // 'setLimit'. nextSort->_sortExecutor->setLimit(*limit); container->erase(itr); } } return nextStage; } DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const { for (auto&& keyPart : _sortExecutor->sortPattern()) { if (keyPart.expression) { keyPart.expression->addDependencies(deps); } else { deps->fields.insert(keyPart.fieldPath->fullPath()); } } if (pExpCtx->needsMerge) { // Include the sort key if we will merge several sorted streams later. deps->setNeedsMetadata(DocumentMetadataFields::kSortKey, true); } return DepsTracker::State::SEE_NEXT; } intrusive_ptr DocumentSourceSort::createFromBson( BSONElement elem, const intrusive_ptr& pExpCtx) { uassert(15973, "the $sort key specification must be an object", elem.type() == Object); return create(pExpCtx, elem.embeddedObject()); } intrusive_ptr DocumentSourceSort::create( const intrusive_ptr& pExpCtx, const SortPattern& sortOrder, uint64_t limit, boost::optional maxMemoryUsageBytes) { auto resolvedMaxBytes = maxMemoryUsageBytes ? *maxMemoryUsageBytes : internalQueryMaxBlockingSortMemoryUsageBytes.load(); intrusive_ptr pSort( new DocumentSourceSort(pExpCtx, sortOrder, limit, resolvedMaxBytes)); return pSort; } DocumentSource::GetNextResult DocumentSourceSort::populate() { auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { loadDocument(nextInput.releaseDocument()); } if (nextInput.isEOF()) { loadingDone(); } return nextInput; } void DocumentSourceSort::loadDocument(Document&& doc) { invariant(!_populated); Value sortKey; Document docForSorter; // We always need to extract the sort key if we've reached this point. If the query system had // already computed the sort key we'd have split the pipeline there, would be merging presorted // documents, and wouldn't use this method. std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc)); _sortExecutor->add(sortKey, docForSorter); } void DocumentSourceSort::loadingDone() { _sortExecutor->loadingDone(); auto& metricsCollector = ResourceConsumption::MetricsCollector::get(pExpCtx->opCtx); metricsCollector.incrementKeysSorted(_sortExecutor->stats().keysSorted); metricsCollector.incrementSorterSpills(_sortExecutor->stats().spills); _populated = true; } bool DocumentSourceSort::usedDisk() { return _sortExecutor->wasDiskUsed(); } std::pair DocumentSourceSort::extractSortKey(Document&& doc) const { Value sortKey = _sortKeyGen->computeSortKeyFromDocument(doc); if (pExpCtx->needsMerge) { // If this sort stage is part of a merged pipeline, make sure that each Document's sort key // gets saved with its metadata. MutableDocument toBeSorted(std::move(doc)); toBeSorted.metadata().setSortKey(sortKey, _sortKeyGen->isSingleElementKey()); return std::make_pair(std::move(sortKey), toBeSorted.freeze()); } else { return std::make_pair(std::move(sortKey), std::move(doc)); } } boost::optional DocumentSourceSort::distributedPlanLogic() { DistributedPlanLogic split; split.shardsStage = this; split.inputSortPattern = _sortExecutor->sortPattern() .serialize(SortPattern::SortKeySerialization::kForSortKeyMerging) .toBson(); if (auto limit = getLimit()) { split.mergingStage = DocumentSourceLimit::create(pExpCtx, *limit); } return split; } bool DocumentSourceSort::canRunInParallelBeforeWriteStage( const std::set& nameOfShardKeyFieldsUponEntryToStage) const { // This is an interesting special case. If there are no further stages which require merging the // streams into one, a $sort should not require it. This is only the case because the sort order // doesn't matter for a pipeline ending with a write stage. We may encounter it here as an // intermediate stage before a final $group with a $sort, which would make sense. Should we // extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort // would generally require merging the streams before producing output. return false; } } // namespace mongo