From 6d0a0e46b987dbc69a5694ff03d53d7e7e25b065 Mon Sep 17 00:00:00 2001 From: David Storch Date: Tue, 1 Oct 2019 19:36:43 +0000 Subject: SERVER-42182 Reimplement SortStage in terms of SortExecutor. This consolidates the implementations of DocumentSourceSort and SortStage to both use the same underlying sort execution code. It also allows a future change to expose external sort for find command (which currently requires 'enableTestCommands=true'). --- src/mongo/db/exec/plan_stats.h | 23 ++-- src/mongo/db/exec/sort.cpp | 265 +++++++----------------------------- src/mongo/db/exec/sort.h | 94 +++---------- src/mongo/db/exec/sort_executor.cpp | 13 ++ src/mongo/db/exec/sort_executor.h | 12 ++ src/mongo/db/exec/sort_test.cpp | 15 +- src/mongo/db/exec/working_set.h | 6 + 7 files changed, 120 insertions(+), 308 deletions(-) (limited to 'src/mongo/db/exec') diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index cff5085e47a..65cafb5d3bc 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -615,17 +615,24 @@ struct SortStats : public SpecificStats { return sortPattern.objsize() + sizeof(*this); } - // What's our current memory usage? - size_t memUsage = 0u; - - // What's our memory limit? - size_t memLimit = 0u; + // The pattern according to which we are sorting. + BSONObj sortPattern; // The number of results to return from the sort. - size_t limit = 0u; + uint64_t limit = 0u; - // The pattern according to which we are sorting. - BSONObj sortPattern; + // The maximum number of bytes of memory we're willing to use during execution of the sort. If + // this limit is exceeded and 'allowDiskUse' is false, the query will fail at execution time. If + // 'allowDiskUse' is true, the data will be spilled to disk. + uint64_t maxMemoryUsageBytes = 0u; + + // The amount of data we've sorted in bytes. At various times this data may be buffered in + // memory or disk-resident, depending on the configuration of 'maxMemoryUsageBytes' and whether + // disk use is allowed. + uint64_t totalDataSizeBytes = 0u; + + // Whether we spilled data to disk during the execution of this query. + bool wasDiskUsed = false; }; struct MergeSortStats : public SpecificStats { diff --git a/src/mongo/db/exec/sort.cpp b/src/mongo/db/exec/sort.cpp index c7c3cbe3ea7..eb5486b61ee 100644 --- a/src/mongo/db/exec/sort.cpp +++ b/src/mongo/db/exec/sort.cpp @@ -27,266 +27,93 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#include "mongo/platform/basic.h" #include "mongo/db/exec/sort.h" - -#include -#include - -#include "mongo/db/catalog/collection.h" -#include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" -#include "mongo/db/index/btree_key_generator.h" -#include "mongo/db/index_names.h" -#include "mongo/db/query/find_common.h" -#include "mongo/db/query/query_knobs_gen.h" -#include "mongo/db/query/query_planner.h" -#include "mongo/util/log.h" namespace mongo { -using std::endl; -using std::unique_ptr; -using std::vector; - -// static -const char* SortStage::kStageType = "SORT"; - -bool SortStage::WorkingSetComparator::operator()(const SortableDataItem& lhs, - const SortableDataItem& rhs) const { - int cmp = sortKeyComparator(lhs.sortKey, rhs.sortKey); - if (cmp != 0) { - return cmp < 0; - } else { - // Indexes use recordId as a final tiebreaker, and we do the same here, so that SortStage - // outputs documents in the same order that an index would have. - return lhs.recordId < rhs.recordId; - } -} - SortStage::SortStage(boost::intrusive_ptr expCtx, WorkingSet* ws, - BSONObj sortPattern, + SortPattern sortPattern, uint64_t limit, uint64_t maxMemoryUsageBytes, std::unique_ptr child) - : PlanStage(kStageType, expCtx->opCtx), + : PlanStage(kStageType.rawData(), expCtx->opCtx), _ws(ws), - _sortExecutor(SortPattern{sortPattern, expCtx}, + _sortExecutor(std::move(sortPattern), limit, maxMemoryUsageBytes, expCtx->tempDir, - expCtx->allowDiskUse), - _pattern(std::move(sortPattern)), - _limit(limit), - _sorted(false), - _resultIterator(_data.end()), - _memUsage(0) { + expCtx->allowDiskUse) { _children.emplace_back(std::move(child)); - - BSONObj sortComparator = FindCommon::transformSortSpec(_pattern); - _workingSetComparator = std::make_unique(sortComparator); - - // If limit > 1, we need to initialize _dataSet here to maintain ordered set of data items while - // fetching from the child stage. - if (_limit > 1) { - const WorkingSetComparator& cmp = *_workingSetComparator; - _dataSet.reset(new SortableDataItemSet(cmp)); - } -} - -bool SortStage::isEOF() { - // We're done when our child has no more results, we've sorted the child's results, and - // we've returned all sorted results. - return child()->isEOF() && _sorted && (_data.end() == _resultIterator); } PlanStage::StageState SortStage::doWork(WorkingSetID* out) { - const size_t maxBytes = static_cast(internalQueryExecMaxBlockingSortBytes.load()); - if (_memUsage > maxBytes) { - str::stream ss; - ss << "Sort operation used more than the maximum " << maxBytes - << " bytes of RAM. Add an index, or specify a smaller limit."; - Status status(ErrorCodes::OperationFailed, ss); - *out = WorkingSetCommon::allocateStatusMember(_ws, status); - return PlanStage::FAILURE; - } - if (isEOF()) { return PlanStage::IS_EOF; } - // Still reading in results to sort. - if (!_sorted) { + if (!_populated) { WorkingSetID id = WorkingSet::INVALID_ID; - StageState code = child()->work(&id); - - if (PlanStage::ADVANCED == code) { - WorkingSetMember* member = _ws->get(id); - - SortableDataItem item; - item.wsid = id; - - // We extract the sort key from the WSM's metadata. This must have been generated by a - // SortKeyGeneratorStage descendent in the execution tree. - item.sortKey = member->metadata().getSortKey(); - - if (member->hasRecordId()) { - // The RecordId breaks ties when sorting two WSMs with the same sort key. - item.recordId = member->recordId; + const StageState code = child()->work(&id); + + if (code == PlanStage::ADVANCED) { + // The plan must be structured such that a previous stage has attached the sort key + // metadata. + auto member = _ws->get(id); + invariant(member->metadata().hasSortKey()); + + auto&& extractedMember = _ws->extract(id); + + try { + auto sortKey = extractedMember.metadata().getSortKey(); + _sortExecutor.add(std::move(sortKey), std::move(extractedMember)); + } catch (const AssertionException&) { + // Propagate runtime errors using the FAILED status code. + *out = WorkingSetCommon::allocateStatusMember(_ws, exceptionToStatus()); + return PlanStage::FAILURE; } - addToBuffer(item); - return PlanStage::NEED_TIME; - } else if (PlanStage::IS_EOF == code) { - // TODO: We don't need the lock for this. We could ask for a yield and do this work - // unlocked. Also, this is performing a lot of work for one call to work(...) - sortBuffer(); - _resultIterator = _data.begin(); - _sorted = true; + } else if (code == PlanStage::IS_EOF) { + // The child has returned all of its results. Record this fact so that subsequent calls + // to 'doWork()' will perform sorting and unspool the sorted results. + _populated = true; + + try { + _sortExecutor.loadingDone(); + } catch (const AssertionException&) { + // Propagate runtime errors using the FAILED status code. + *out = WorkingSetCommon::allocateStatusMember(_ws, exceptionToStatus()); + return PlanStage::FAILURE; + } + return PlanStage::NEED_TIME; - } else if (PlanStage::FAILURE == code) { - // The stage which produces a failure is responsible for allocating a working set member - // with error details. - invariant(WorkingSet::INVALID_ID != id); - *out = id; - return code; - } else if (PlanStage::NEED_YIELD == code) { + } else { *out = id; } return code; } - // Returning results. - verify(_resultIterator != _data.end()); - verify(_sorted); - *out = _resultIterator->wsid; - _resultIterator++; + auto nextWsm = _sortExecutor.getNextWsm(); + if (!nextWsm) { + return PlanStage::IS_EOF; + } + *out = _ws->emplace(std::move(*nextWsm)); return PlanStage::ADVANCED; } -unique_ptr SortStage::getStats() { +std::unique_ptr SortStage::getStats() { _commonStats.isEOF = isEOF(); - const size_t maxBytes = static_cast(internalQueryExecMaxBlockingSortBytes.load()); - _specificStats.memLimit = maxBytes; - _specificStats.memUsage = _memUsage; - _specificStats.limit = _limit; - _specificStats.sortPattern = _pattern.getOwned(); - - unique_ptr ret = std::make_unique(_commonStats, STAGE_SORT); - ret->specific = std::make_unique(_specificStats); + std::unique_ptr ret = + std::make_unique(_commonStats, STAGE_SORT); + ret->specific = _sortExecutor.stats(); ret->children.emplace_back(child()->getStats()); return ret; } -const SpecificStats* SortStage::getSpecificStats() const { - return &_specificStats; -} - -/** - * addToBuffer() and sortBuffer() work differently based on the - * configured limit. addToBuffer() is also responsible for - * performing some accounting on the overall memory usage to - * make sure we're not using too much memory. - * - * limit == 0: - * addToBuffer() - Adds item to vector. - * sortBuffer() - Sorts vector. - * limit == 1: - * addToBuffer() - Replaces first item in vector with max of - * current and new item. - * Updates memory usage if item was replaced. - * sortBuffer() - Does nothing. - * limit > 1: - * addToBuffer() - Does not update vector. Adds item to set. - * If size of set exceeds limit, remove item from set - * with lowest key. Updates memory usage accordingly. - * sortBuffer() - Copies items from set to vectors. - */ -void SortStage::addToBuffer(const SortableDataItem& item) { - // Holds ID of working set member to be freed at end of this function. - WorkingSetID wsidToFree = WorkingSet::INVALID_ID; - - WorkingSetMember* member = _ws->get(item.wsid); - if (_limit == 0) { - // Ensure that the BSONObj underlying the WorkingSetMember is owned in case we yield. - member->makeObjOwnedIfNeeded(); - _data.push_back(item); - _memUsage += member->getMemUsage(); - } else if (_limit == 1) { - if (_data.empty()) { - member->makeObjOwnedIfNeeded(); - _data.push_back(item); - _memUsage = member->getMemUsage(); - return; - } - wsidToFree = item.wsid; - const WorkingSetComparator& cmp = *_workingSetComparator; - // Compare new item with existing item in vector. - if (cmp(item, _data[0])) { - wsidToFree = _data[0].wsid; - member->makeObjOwnedIfNeeded(); - _data[0] = item; - _memUsage = member->getMemUsage(); - } - } else { - // Update data item set instead of vector - // Limit not reached - insert and return - vector::size_type limit(_limit); - if (_dataSet->size() < limit) { - member->makeObjOwnedIfNeeded(); - _dataSet->insert(item); - _memUsage += member->getMemUsage(); - return; - } - // Limit will be exceeded - compare with item with lowest key - // If new item does not have a lower key value than last item, - // do nothing. - wsidToFree = item.wsid; - SortableDataItemSet::const_iterator lastItemIt = --(_dataSet->end()); - const SortableDataItem& lastItem = *lastItemIt; - const WorkingSetComparator& cmp = *_workingSetComparator; - if (cmp(item, lastItem)) { - _memUsage -= _ws->get(lastItem.wsid)->getMemUsage(); - _memUsage += member->getMemUsage(); - wsidToFree = lastItem.wsid; - // According to std::set iterator validity rules, - // it does not matter which of erase()/insert() happens first. - // Here, we choose to erase first to release potential resources - // used by the last item and to keep the scope of the iterator to a minimum. - _dataSet->erase(lastItemIt); - member->makeObjOwnedIfNeeded(); - _dataSet->insert(item); - } - } - - // There was a buffered result which we can throw out because we are executing a sort with a - // limit, and the result is now known not to be in the top k set. Free the working set member - // associated with 'wsidToFree'. - if (wsidToFree != WorkingSet::INVALID_ID) { - _ws->free(wsidToFree); - } -} - -void SortStage::sortBuffer() { - if (_limit == 0) { - const WorkingSetComparator& cmp = *_workingSetComparator; - std::sort(_data.begin(), _data.end(), cmp); - } else if (_limit == 1) { - // Buffer contains either 0 or 1 item so it is already in a sorted state. - return; - } else { - // Set already contains items in sorted order, so we simply copy the items - // from the set to the vector. - // Release the memory for the set after the copy. - vector newData(_dataSet->begin(), _dataSet->end()); - _data.swap(newData); - _dataSet.reset(); - } -} - } // namespace mongo diff --git a/src/mongo/db/exec/sort.h b/src/mongo/db/exec/sort.h index 53957917581..f437ce0b719 100644 --- a/src/mongo/db/exec/sort.h +++ b/src/mongo/db/exec/sort.h @@ -51,14 +51,19 @@ namespace mongo { */ class SortStage final : public PlanStage { public: + static constexpr StringData kStageType = "SORT"_sd; + SortStage(boost::intrusive_ptr expCtx, WorkingSet* ws, - BSONObj sortPattern, + SortPattern sortPattern, uint64_t limit, uint64_t maxMemoryUsageBytes, std::unique_ptr child); - bool isEOF() final; + bool isEOF() final { + return _sortExecutor.isEOF(); + } + StageState doWork(WorkingSetID* out) final; StageType stageType() const final { @@ -67,91 +72,24 @@ public: std::unique_ptr getStats(); - const SpecificStats* getSpecificStats() const final; - - static const char* kStageType; + /** + * Returns nullptr. Stats related to sort execution must be extracted with 'getStats()', since + * they are retrieved on demand from the underlying sort execution machinery. + */ + const SpecificStats* getSpecificStats() const final { + return nullptr; + } private: // Not owned by us. WorkingSet* _ws; - // TODO SERVER-42182: Use SortExecutor to implement 'doWork()'. SortExecutor _sortExecutor; - // The raw sort _pattern as expressed by the user - BSONObj _pattern; - - // Equal to 0 for no limit. - size_t _limit; - - // - // Data storage - // - - // Have we sorted our data? If so, we can access _resultIterator. If not, - // we're still populating _data. - bool _sorted; - - // Collection of working set members to sort with their respective sort key. - struct SortableDataItem { - WorkingSetID wsid; - Value sortKey; - // Since we must replicate the behavior of a covered sort as much as possible we use the - // RecordId to break sortKey ties. - // See sorta.js. - RecordId recordId; - }; - - // Comparison object for data buffers (vector and set). Items are compared on (sortKey, loc). - // This is also how the items are ordered in the indices. Keys are compared using - // BSONObj::woCompare() with RecordId as a tie-breaker. - // - // We are comparing keys generated by the SortKeyGenerator, which are already ordered with - // respect the collation. Therefore, we explicitly avoid comparing using a collator here. - struct WorkingSetComparator { - explicit WorkingSetComparator(const BSONObj& pattern) : sortKeyComparator(pattern) {} - - bool operator()(const SortableDataItem& lhs, const SortableDataItem& rhs) const; - - SortKeyComparator sortKeyComparator; - }; - - /** - * Inserts one item into data buffer (vector or set). - * If limit is exceeded, remove item with lowest key. - */ - void addToBuffer(const SortableDataItem& item); - - /** - * Sorts data buffer. - * Assumes no more items will be added to buffer. - * If data is stored in set, copy set - * contents to vector and clear set. - */ - void sortBuffer(); - - // Comparator for data buffer - // Initialization follows sort key generator - std::unique_ptr _workingSetComparator; - - // The data we buffer and sort. - // _data will contain sorted data when all data is gathered - // and sorted. - // When _limit is greater than 1 and not all data has been gathered from child stage, - // _dataSet is used instead to maintain an ordered set of the incomplete data set. - // When the data set is complete, we copy the items from _dataSet to _data which will - // be used to provide the results of this stage through _resultIterator. - std::vector _data; - typedef std::set SortableDataItemSet; - std::unique_ptr _dataSet; - - // Iterates through _data post-sort returning it. - std::vector::iterator _resultIterator; - SortStats _specificStats; - // The usage in bytes of all buffered data that we're sorting. - size_t _memUsage; + // Whether or not we have finished loading data into '_sortExecutor'. + bool _populated = false; }; } // namespace mongo diff --git a/src/mongo/db/exec/sort_executor.cpp b/src/mongo/db/exec/sort_executor.cpp index f90c8576bbf..5cc11e004b0 100644 --- a/src/mongo/db/exec/sort_executor.cpp +++ b/src/mongo/db/exec/sort_executor.cpp @@ -109,6 +109,8 @@ void SortExecutor::add(Value sortKey, WorkingSetMember data) { // Metadata should be attached directly to the WSM rather than inside the Document. invariant(!data.doc.value().metadata()); + _totalDataSizeBytes += data.getMemUsage(); + if (!_sorter) { _sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern))); } @@ -139,6 +141,17 @@ SortOptions SortExecutor::makeSortOptions() const { return opts; } + +std::unique_ptr SortExecutor::stats() const { + auto stats = std::make_unique(); + stats->sortPattern = + _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain).toBson(); + stats->limit = _limit; + stats->maxMemoryUsageBytes = _maxMemoryUsageBytes; + stats->totalDataSizeBytes = _totalDataSizeBytes; + stats->wasDiskUsed = _wasDiskUsed; + return stats; +} } // namespace mongo #include "mongo/db/sorter/sorter.cpp" diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h index 3ec21b65fd0..c0945fe1fd4 100644 --- a/src/mongo/db/exec/sort_executor.h +++ b/src/mongo/db/exec/sort_executor.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/sort_key_comparator.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/pipeline/expression.h" @@ -98,6 +99,16 @@ public: */ void add(Value, WorkingSetMember); + /** + * Returns true if the loading phase has been explicitly completed, and then the stream of + * documents has subsequently been exhausted by "get next" calls. + */ + bool isEOF() const { + return _isEOF; + } + + std::unique_ptr stats() const; + private: using DocumentSorter = Sorter; class Comparator { @@ -125,5 +136,6 @@ private: bool _isEOF = false; bool _wasDiskUsed = false; + uint64_t _totalDataSizeBytes = 0u; }; } // namespace mongo diff --git a/src/mongo/db/exec/sort_test.cpp b/src/mongo/db/exec/sort_test.cpp index 349d3a7a1af..9e4494f27a6 100644 --- a/src/mongo/db/exec/sort_test.cpp +++ b/src/mongo/db/exec/sort_test.cpp @@ -109,8 +109,12 @@ public: auto sortKeyGen = std::make_unique( expCtx, std::move(queuedDataStage), &ws, sortPattern); - SortStage sort( - expCtx, &ws, sortPattern, limit, kMaxMemoryUsageBytes, std::move(sortKeyGen)); + SortStage sort(expCtx, + &ws, + SortPattern{sortPattern, expCtx}, + limit, + kMaxMemoryUsageBytes, + std::move(sortKeyGen)); WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState state = PlanStage::NEED_TIME; @@ -170,7 +174,12 @@ TEST_F(SortStageTest, SortEmptyWorkingSet) { auto sortKeyGen = std::make_unique(expCtx, std::move(queuedDataStage), &ws, BSONObj()); auto sortPattern = BSON("a" << 1); - SortStage sort(expCtx, &ws, sortPattern, 0u, kMaxMemoryUsageBytes, std::move(sortKeyGen)); + SortStage sort(expCtx, + &ws, + SortPattern{sortPattern, expCtx}, + 0u, + kMaxMemoryUsageBytes, + std::move(sortKeyGen)); // Check initial EOF state. ASSERT_FALSE(sort.isEOF()); diff --git a/src/mongo/db/exec/working_set.h b/src/mongo/db/exec/working_set.h index 846111d059d..7021ed5eb62 100644 --- a/src/mongo/db/exec/working_set.h +++ b/src/mongo/db/exec/working_set.h @@ -230,6 +230,12 @@ public: return getMemUsage(); } + WorkingSetMember getOwned() const { + auto ret = *this; + ret.makeObjOwnedIfNeeded(); + return ret; + } + private: friend class WorkingSet; -- cgit v1.2.1