summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2019-10-01 19:36:43 +0000
committerevergreen <evergreen@mongodb.com>2019-10-01 19:36:43 +0000
commit6d0a0e46b987dbc69a5694ff03d53d7e7e25b065 (patch)
tree65100de1652fcfebdc2569d8910972d2ae891fa4 /src/mongo/db/exec
parent0187f0fed3630b3f519daeb7290f40a92b700691 (diff)
downloadmongo-6d0a0e46b987dbc69a5694ff03d53d7e7e25b065.tar.gz
SERVER-42182 Reimplement SortStage in terms of SortExecutor.
This consolidates the implementations of DocumentSourceSort and SortStage to both use the same underlying sort execution code. It also allows a future change to expose external sort for find command (which currently requires 'enableTestCommands=true').
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/plan_stats.h23
-rw-r--r--src/mongo/db/exec/sort.cpp265
-rw-r--r--src/mongo/db/exec/sort.h94
-rw-r--r--src/mongo/db/exec/sort_executor.cpp13
-rw-r--r--src/mongo/db/exec/sort_executor.h12
-rw-r--r--src/mongo/db/exec/sort_test.cpp15
-rw-r--r--src/mongo/db/exec/working_set.h6
7 files changed, 120 insertions, 308 deletions
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index cff5085e47a..65cafb5d3bc 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -615,17 +615,24 @@ struct SortStats : public SpecificStats {
return sortPattern.objsize() + sizeof(*this);
}
- // What's our current memory usage?
- size_t memUsage = 0u;
-
- // What's our memory limit?
- size_t memLimit = 0u;
+ // The pattern according to which we are sorting.
+ BSONObj sortPattern;
// The number of results to return from the sort.
- size_t limit = 0u;
+ uint64_t limit = 0u;
- // The pattern according to which we are sorting.
- BSONObj sortPattern;
+ // The maximum number of bytes of memory we're willing to use during execution of the sort. If
+ // this limit is exceeded and 'allowDiskUse' is false, the query will fail at execution time. If
+ // 'allowDiskUse' is true, the data will be spilled to disk.
+ uint64_t maxMemoryUsageBytes = 0u;
+
+ // The amount of data we've sorted in bytes. At various times this data may be buffered in
+ // memory or disk-resident, depending on the configuration of 'maxMemoryUsageBytes' and whether
+ // disk use is allowed.
+ uint64_t totalDataSizeBytes = 0u;
+
+ // Whether we spilled data to disk during the execution of this query.
+ bool wasDiskUsed = false;
};
struct MergeSortStats : public SpecificStats {
diff --git a/src/mongo/db/exec/sort.cpp b/src/mongo/db/exec/sort.cpp
index c7c3cbe3ea7..eb5486b61ee 100644
--- a/src/mongo/db/exec/sort.cpp
+++ b/src/mongo/db/exec/sort.cpp
@@ -27,266 +27,93 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+#include "mongo/platform/basic.h"
#include "mongo/db/exec/sort.h"
-
-#include <algorithm>
-#include <memory>
-
-#include "mongo/db/catalog/collection.h"
-#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
-#include "mongo/db/index/btree_key_generator.h"
-#include "mongo/db/index_names.h"
-#include "mongo/db/query/find_common.h"
-#include "mongo/db/query/query_knobs_gen.h"
-#include "mongo/db/query/query_planner.h"
-#include "mongo/util/log.h"
namespace mongo {
-using std::endl;
-using std::unique_ptr;
-using std::vector;
-
-// static
-const char* SortStage::kStageType = "SORT";
-
-bool SortStage::WorkingSetComparator::operator()(const SortableDataItem& lhs,
- const SortableDataItem& rhs) const {
- int cmp = sortKeyComparator(lhs.sortKey, rhs.sortKey);
- if (cmp != 0) {
- return cmp < 0;
- } else {
- // Indexes use recordId as a final tiebreaker, and we do the same here, so that SortStage
- // outputs documents in the same order that an index would have.
- return lhs.recordId < rhs.recordId;
- }
-}
-
SortStage::SortStage(boost::intrusive_ptr<ExpressionContext> expCtx,
WorkingSet* ws,
- BSONObj sortPattern,
+ SortPattern sortPattern,
uint64_t limit,
uint64_t maxMemoryUsageBytes,
std::unique_ptr<PlanStage> child)
- : PlanStage(kStageType, expCtx->opCtx),
+ : PlanStage(kStageType.rawData(), expCtx->opCtx),
_ws(ws),
- _sortExecutor(SortPattern{sortPattern, expCtx},
+ _sortExecutor(std::move(sortPattern),
limit,
maxMemoryUsageBytes,
expCtx->tempDir,
- expCtx->allowDiskUse),
- _pattern(std::move(sortPattern)),
- _limit(limit),
- _sorted(false),
- _resultIterator(_data.end()),
- _memUsage(0) {
+ expCtx->allowDiskUse) {
_children.emplace_back(std::move(child));
-
- BSONObj sortComparator = FindCommon::transformSortSpec(_pattern);
- _workingSetComparator = std::make_unique<WorkingSetComparator>(sortComparator);
-
- // If limit > 1, we need to initialize _dataSet here to maintain ordered set of data items while
- // fetching from the child stage.
- if (_limit > 1) {
- const WorkingSetComparator& cmp = *_workingSetComparator;
- _dataSet.reset(new SortableDataItemSet(cmp));
- }
-}
-
-bool SortStage::isEOF() {
- // We're done when our child has no more results, we've sorted the child's results, and
- // we've returned all sorted results.
- return child()->isEOF() && _sorted && (_data.end() == _resultIterator);
}
PlanStage::StageState SortStage::doWork(WorkingSetID* out) {
- const size_t maxBytes = static_cast<size_t>(internalQueryExecMaxBlockingSortBytes.load());
- if (_memUsage > maxBytes) {
- str::stream ss;
- ss << "Sort operation used more than the maximum " << maxBytes
- << " bytes of RAM. Add an index, or specify a smaller limit.";
- Status status(ErrorCodes::OperationFailed, ss);
- *out = WorkingSetCommon::allocateStatusMember(_ws, status);
- return PlanStage::FAILURE;
- }
-
if (isEOF()) {
return PlanStage::IS_EOF;
}
- // Still reading in results to sort.
- if (!_sorted) {
+ if (!_populated) {
WorkingSetID id = WorkingSet::INVALID_ID;
- StageState code = child()->work(&id);
-
- if (PlanStage::ADVANCED == code) {
- WorkingSetMember* member = _ws->get(id);
-
- SortableDataItem item;
- item.wsid = id;
-
- // We extract the sort key from the WSM's metadata. This must have been generated by a
- // SortKeyGeneratorStage descendent in the execution tree.
- item.sortKey = member->metadata().getSortKey();
-
- if (member->hasRecordId()) {
- // The RecordId breaks ties when sorting two WSMs with the same sort key.
- item.recordId = member->recordId;
+ const StageState code = child()->work(&id);
+
+ if (code == PlanStage::ADVANCED) {
+ // The plan must be structured such that a previous stage has attached the sort key
+ // metadata.
+ auto member = _ws->get(id);
+ invariant(member->metadata().hasSortKey());
+
+ auto&& extractedMember = _ws->extract(id);
+
+ try {
+ auto sortKey = extractedMember.metadata().getSortKey();
+ _sortExecutor.add(std::move(sortKey), std::move(extractedMember));
+ } catch (const AssertionException&) {
+ // Propagate runtime errors using the FAILED status code.
+ *out = WorkingSetCommon::allocateStatusMember(_ws, exceptionToStatus());
+ return PlanStage::FAILURE;
}
- addToBuffer(item);
-
return PlanStage::NEED_TIME;
- } else if (PlanStage::IS_EOF == code) {
- // TODO: We don't need the lock for this. We could ask for a yield and do this work
- // unlocked. Also, this is performing a lot of work for one call to work(...)
- sortBuffer();
- _resultIterator = _data.begin();
- _sorted = true;
+ } else if (code == PlanStage::IS_EOF) {
+ // The child has returned all of its results. Record this fact so that subsequent calls
+ // to 'doWork()' will perform sorting and unspool the sorted results.
+ _populated = true;
+
+ try {
+ _sortExecutor.loadingDone();
+ } catch (const AssertionException&) {
+ // Propagate runtime errors using the FAILED status code.
+ *out = WorkingSetCommon::allocateStatusMember(_ws, exceptionToStatus());
+ return PlanStage::FAILURE;
+ }
+
return PlanStage::NEED_TIME;
- } else if (PlanStage::FAILURE == code) {
- // The stage which produces a failure is responsible for allocating a working set member
- // with error details.
- invariant(WorkingSet::INVALID_ID != id);
- *out = id;
- return code;
- } else if (PlanStage::NEED_YIELD == code) {
+ } else {
*out = id;
}
return code;
}
- // Returning results.
- verify(_resultIterator != _data.end());
- verify(_sorted);
- *out = _resultIterator->wsid;
- _resultIterator++;
+ auto nextWsm = _sortExecutor.getNextWsm();
+ if (!nextWsm) {
+ return PlanStage::IS_EOF;
+ }
+ *out = _ws->emplace(std::move(*nextWsm));
return PlanStage::ADVANCED;
}
-unique_ptr<PlanStageStats> SortStage::getStats() {
+std::unique_ptr<PlanStageStats> SortStage::getStats() {
_commonStats.isEOF = isEOF();
- const size_t maxBytes = static_cast<size_t>(internalQueryExecMaxBlockingSortBytes.load());
- _specificStats.memLimit = maxBytes;
- _specificStats.memUsage = _memUsage;
- _specificStats.limit = _limit;
- _specificStats.sortPattern = _pattern.getOwned();
-
- unique_ptr<PlanStageStats> ret = std::make_unique<PlanStageStats>(_commonStats, STAGE_SORT);
- ret->specific = std::make_unique<SortStats>(_specificStats);
+ std::unique_ptr<PlanStageStats> ret =
+ std::make_unique<PlanStageStats>(_commonStats, STAGE_SORT);
+ ret->specific = _sortExecutor.stats();
ret->children.emplace_back(child()->getStats());
return ret;
}
-const SpecificStats* SortStage::getSpecificStats() const {
- return &_specificStats;
-}
-
-/**
- * addToBuffer() and sortBuffer() work differently based on the
- * configured limit. addToBuffer() is also responsible for
- * performing some accounting on the overall memory usage to
- * make sure we're not using too much memory.
- *
- * limit == 0:
- * addToBuffer() - Adds item to vector.
- * sortBuffer() - Sorts vector.
- * limit == 1:
- * addToBuffer() - Replaces first item in vector with max of
- * current and new item.
- * Updates memory usage if item was replaced.
- * sortBuffer() - Does nothing.
- * limit > 1:
- * addToBuffer() - Does not update vector. Adds item to set.
- * If size of set exceeds limit, remove item from set
- * with lowest key. Updates memory usage accordingly.
- * sortBuffer() - Copies items from set to vectors.
- */
-void SortStage::addToBuffer(const SortableDataItem& item) {
- // Holds ID of working set member to be freed at end of this function.
- WorkingSetID wsidToFree = WorkingSet::INVALID_ID;
-
- WorkingSetMember* member = _ws->get(item.wsid);
- if (_limit == 0) {
- // Ensure that the BSONObj underlying the WorkingSetMember is owned in case we yield.
- member->makeObjOwnedIfNeeded();
- _data.push_back(item);
- _memUsage += member->getMemUsage();
- } else if (_limit == 1) {
- if (_data.empty()) {
- member->makeObjOwnedIfNeeded();
- _data.push_back(item);
- _memUsage = member->getMemUsage();
- return;
- }
- wsidToFree = item.wsid;
- const WorkingSetComparator& cmp = *_workingSetComparator;
- // Compare new item with existing item in vector.
- if (cmp(item, _data[0])) {
- wsidToFree = _data[0].wsid;
- member->makeObjOwnedIfNeeded();
- _data[0] = item;
- _memUsage = member->getMemUsage();
- }
- } else {
- // Update data item set instead of vector
- // Limit not reached - insert and return
- vector<SortableDataItem>::size_type limit(_limit);
- if (_dataSet->size() < limit) {
- member->makeObjOwnedIfNeeded();
- _dataSet->insert(item);
- _memUsage += member->getMemUsage();
- return;
- }
- // Limit will be exceeded - compare with item with lowest key
- // If new item does not have a lower key value than last item,
- // do nothing.
- wsidToFree = item.wsid;
- SortableDataItemSet::const_iterator lastItemIt = --(_dataSet->end());
- const SortableDataItem& lastItem = *lastItemIt;
- const WorkingSetComparator& cmp = *_workingSetComparator;
- if (cmp(item, lastItem)) {
- _memUsage -= _ws->get(lastItem.wsid)->getMemUsage();
- _memUsage += member->getMemUsage();
- wsidToFree = lastItem.wsid;
- // According to std::set iterator validity rules,
- // it does not matter which of erase()/insert() happens first.
- // Here, we choose to erase first to release potential resources
- // used by the last item and to keep the scope of the iterator to a minimum.
- _dataSet->erase(lastItemIt);
- member->makeObjOwnedIfNeeded();
- _dataSet->insert(item);
- }
- }
-
- // There was a buffered result which we can throw out because we are executing a sort with a
- // limit, and the result is now known not to be in the top k set. Free the working set member
- // associated with 'wsidToFree'.
- if (wsidToFree != WorkingSet::INVALID_ID) {
- _ws->free(wsidToFree);
- }
-}
-
-void SortStage::sortBuffer() {
- if (_limit == 0) {
- const WorkingSetComparator& cmp = *_workingSetComparator;
- std::sort(_data.begin(), _data.end(), cmp);
- } else if (_limit == 1) {
- // Buffer contains either 0 or 1 item so it is already in a sorted state.
- return;
- } else {
- // Set already contains items in sorted order, so we simply copy the items
- // from the set to the vector.
- // Release the memory for the set after the copy.
- vector<SortableDataItem> newData(_dataSet->begin(), _dataSet->end());
- _data.swap(newData);
- _dataSet.reset();
- }
-}
-
} // namespace mongo
diff --git a/src/mongo/db/exec/sort.h b/src/mongo/db/exec/sort.h
index 53957917581..f437ce0b719 100644
--- a/src/mongo/db/exec/sort.h
+++ b/src/mongo/db/exec/sort.h
@@ -51,14 +51,19 @@ namespace mongo {
*/
class SortStage final : public PlanStage {
public:
+ static constexpr StringData kStageType = "SORT"_sd;
+
SortStage(boost::intrusive_ptr<ExpressionContext> expCtx,
WorkingSet* ws,
- BSONObj sortPattern,
+ SortPattern sortPattern,
uint64_t limit,
uint64_t maxMemoryUsageBytes,
std::unique_ptr<PlanStage> child);
- bool isEOF() final;
+ bool isEOF() final {
+ return _sortExecutor.isEOF();
+ }
+
StageState doWork(WorkingSetID* out) final;
StageType stageType() const final {
@@ -67,91 +72,24 @@ public:
std::unique_ptr<PlanStageStats> getStats();
- const SpecificStats* getSpecificStats() const final;
-
- static const char* kStageType;
+ /**
+ * Returns nullptr. Stats related to sort execution must be extracted with 'getStats()', since
+ * they are retrieved on demand from the underlying sort execution machinery.
+ */
+ const SpecificStats* getSpecificStats() const final {
+ return nullptr;
+ }
private:
// Not owned by us.
WorkingSet* _ws;
- // TODO SERVER-42182: Use SortExecutor to implement 'doWork()'.
SortExecutor _sortExecutor;
- // The raw sort _pattern as expressed by the user
- BSONObj _pattern;
-
- // Equal to 0 for no limit.
- size_t _limit;
-
- //
- // Data storage
- //
-
- // Have we sorted our data? If so, we can access _resultIterator. If not,
- // we're still populating _data.
- bool _sorted;
-
- // Collection of working set members to sort with their respective sort key.
- struct SortableDataItem {
- WorkingSetID wsid;
- Value sortKey;
- // Since we must replicate the behavior of a covered sort as much as possible we use the
- // RecordId to break sortKey ties.
- // See sorta.js.
- RecordId recordId;
- };
-
- // Comparison object for data buffers (vector and set). Items are compared on (sortKey, loc).
- // This is also how the items are ordered in the indices. Keys are compared using
- // BSONObj::woCompare() with RecordId as a tie-breaker.
- //
- // We are comparing keys generated by the SortKeyGenerator, which are already ordered with
- // respect the collation. Therefore, we explicitly avoid comparing using a collator here.
- struct WorkingSetComparator {
- explicit WorkingSetComparator(const BSONObj& pattern) : sortKeyComparator(pattern) {}
-
- bool operator()(const SortableDataItem& lhs, const SortableDataItem& rhs) const;
-
- SortKeyComparator sortKeyComparator;
- };
-
- /**
- * Inserts one item into data buffer (vector or set).
- * If limit is exceeded, remove item with lowest key.
- */
- void addToBuffer(const SortableDataItem& item);
-
- /**
- * Sorts data buffer.
- * Assumes no more items will be added to buffer.
- * If data is stored in set, copy set
- * contents to vector and clear set.
- */
- void sortBuffer();
-
- // Comparator for data buffer
- // Initialization follows sort key generator
- std::unique_ptr<WorkingSetComparator> _workingSetComparator;
-
- // The data we buffer and sort.
- // _data will contain sorted data when all data is gathered
- // and sorted.
- // When _limit is greater than 1 and not all data has been gathered from child stage,
- // _dataSet is used instead to maintain an ordered set of the incomplete data set.
- // When the data set is complete, we copy the items from _dataSet to _data which will
- // be used to provide the results of this stage through _resultIterator.
- std::vector<SortableDataItem> _data;
- typedef std::set<SortableDataItem, WorkingSetComparator> SortableDataItemSet;
- std::unique_ptr<SortableDataItemSet> _dataSet;
-
- // Iterates through _data post-sort returning it.
- std::vector<SortableDataItem>::iterator _resultIterator;
-
SortStats _specificStats;
- // The usage in bytes of all buffered data that we're sorting.
- size_t _memUsage;
+ // Whether or not we have finished loading data into '_sortExecutor'.
+ bool _populated = false;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/sort_executor.cpp b/src/mongo/db/exec/sort_executor.cpp
index f90c8576bbf..5cc11e004b0 100644
--- a/src/mongo/db/exec/sort_executor.cpp
+++ b/src/mongo/db/exec/sort_executor.cpp
@@ -109,6 +109,8 @@ void SortExecutor::add(Value sortKey, WorkingSetMember data) {
// Metadata should be attached directly to the WSM rather than inside the Document.
invariant(!data.doc.value().metadata());
+ _totalDataSizeBytes += data.getMemUsage();
+
if (!_sorter) {
_sorter.reset(DocumentSorter::make(makeSortOptions(), Comparator(_sortPattern)));
}
@@ -139,6 +141,17 @@ SortOptions SortExecutor::makeSortOptions() const {
return opts;
}
+
+std::unique_ptr<SortStats> SortExecutor::stats() const {
+ auto stats = std::make_unique<SortStats>();
+ stats->sortPattern =
+ _sortPattern.serialize(SortPattern::SortKeySerialization::kForExplain).toBson();
+ stats->limit = _limit;
+ stats->maxMemoryUsageBytes = _maxMemoryUsageBytes;
+ stats->totalDataSizeBytes = _totalDataSizeBytes;
+ stats->wasDiskUsed = _wasDiskUsed;
+ return stats;
+}
} // namespace mongo
#include "mongo/db/sorter/sorter.cpp"
diff --git a/src/mongo/db/exec/sort_executor.h b/src/mongo/db/exec/sort_executor.h
index 3ec21b65fd0..c0945fe1fd4 100644
--- a/src/mongo/db/exec/sort_executor.h
+++ b/src/mongo/db/exec/sort_executor.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/exec/sort_key_comparator.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/pipeline/expression.h"
@@ -98,6 +99,16 @@ public:
*/
void add(Value, WorkingSetMember);
+ /**
+ * Returns true if the loading phase has been explicitly completed, and then the stream of
+ * documents has subsequently been exhausted by "get next" calls.
+ */
+ bool isEOF() const {
+ return _isEOF;
+ }
+
+ std::unique_ptr<SortStats> stats() const;
+
private:
using DocumentSorter = Sorter<Value, WorkingSetMember>;
class Comparator {
@@ -125,5 +136,6 @@ private:
bool _isEOF = false;
bool _wasDiskUsed = false;
+ uint64_t _totalDataSizeBytes = 0u;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/sort_test.cpp b/src/mongo/db/exec/sort_test.cpp
index 349d3a7a1af..9e4494f27a6 100644
--- a/src/mongo/db/exec/sort_test.cpp
+++ b/src/mongo/db/exec/sort_test.cpp
@@ -109,8 +109,12 @@ public:
auto sortKeyGen = std::make_unique<SortKeyGeneratorStage>(
expCtx, std::move(queuedDataStage), &ws, sortPattern);
- SortStage sort(
- expCtx, &ws, sortPattern, limit, kMaxMemoryUsageBytes, std::move(sortKeyGen));
+ SortStage sort(expCtx,
+ &ws,
+ SortPattern{sortPattern, expCtx},
+ limit,
+ kMaxMemoryUsageBytes,
+ std::move(sortKeyGen));
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState state = PlanStage::NEED_TIME;
@@ -170,7 +174,12 @@ TEST_F(SortStageTest, SortEmptyWorkingSet) {
auto sortKeyGen =
std::make_unique<SortKeyGeneratorStage>(expCtx, std::move(queuedDataStage), &ws, BSONObj());
auto sortPattern = BSON("a" << 1);
- SortStage sort(expCtx, &ws, sortPattern, 0u, kMaxMemoryUsageBytes, std::move(sortKeyGen));
+ SortStage sort(expCtx,
+ &ws,
+ SortPattern{sortPattern, expCtx},
+ 0u,
+ kMaxMemoryUsageBytes,
+ std::move(sortKeyGen));
// Check initial EOF state.
ASSERT_FALSE(sort.isEOF());
diff --git a/src/mongo/db/exec/working_set.h b/src/mongo/db/exec/working_set.h
index 846111d059d..7021ed5eb62 100644
--- a/src/mongo/db/exec/working_set.h
+++ b/src/mongo/db/exec/working_set.h
@@ -230,6 +230,12 @@ public:
return getMemUsage();
}
+ WorkingSetMember getOwned() const {
+ auto ret = *this;
+ ret.makeObjOwnedIfNeeded();
+ return ret;
+ }
+
private:
friend class WorkingSet;