/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/exec/sort.h"
#include
#include "mongo/db/catalog/collection.h"
#include "mongo/db/index_names.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/exec/working_set_computed_data.h"
#include "mongo/db/index/btree_key_generator.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/db/query/qlog.h"
#include "mongo/db/query/query_planner.h"
namespace mongo {
using std::vector;
const size_t kMaxBytes = 32 * 1024 * 1024;
// static
const char* SortStage::kStageType = "SORT";
SortStageKeyGenerator::SortStageKeyGenerator(const Collection* collection,
const BSONObj& sortSpec,
const BSONObj& queryObj) {
_collection = collection;
_hasBounds = false;
_sortHasMeta = false;
_rawSortSpec = sortSpec;
// 'sortSpec' can be a mix of $meta and index key expressions. We pick it apart so that
// we only generate Btree keys for the index key expressions.
// The Btree key fields go in here. We pass this fake index key pattern to the Btree
// key generator below as part of generating sort keys for the docs.
BSONObjBuilder btreeBob;
// The pattern we use to woCompare keys. Each field in 'sortSpec' will go in here with
// a value of 1 or -1. The Btree key fields are verbatim, meta fields have a default.
BSONObjBuilder comparatorBob;
BSONObjIterator it(sortSpec);
while (it.more()) {
BSONElement elt = it.next();
if (elt.isNumber()) {
// Btree key. elt (should be) foo: 1 or foo: -1.
comparatorBob.append(elt);
btreeBob.append(elt);
}
else if (LiteParsedQuery::isTextScoreMeta(elt)) {
// Sort text score decreasing by default. Field name doesn't matter but we choose
// something that a user shouldn't ever have.
comparatorBob.append("$metaTextScore", -1);
_sortHasMeta = true;
}
else {
// Sort spec. should have been validated before here.
verify(false);
}
}
// Our pattern for woComparing keys.
_comparatorObj = comparatorBob.obj();
// The fake index key pattern used to generate Btree keys.
_btreeObj = btreeBob.obj();
// If we're just sorting by meta, don't bother with all the key stuff.
if (_btreeObj.isEmpty()) {
return;
}
// We'll need to treat arrays as if we were to create an index over them. that is,
// we may need to unnest the first level and consider each array element to decide
// the sort order.
std::vector fieldNames;
std::vector fixed;
BSONObjIterator btreeIt(_btreeObj);
while (btreeIt.more()) {
BSONElement patternElt = btreeIt.next();
fieldNames.push_back(patternElt.fieldName());
fixed.push_back(BSONElement());
}
_keyGen.reset(new BtreeKeyGeneratorV1(fieldNames, fixed, false /* not sparse */));
// The bounds checker only works on the Btree part of the sort key.
getBoundsForSort(queryObj, _btreeObj);
if (_hasBounds) {
_boundsChecker.reset(new IndexBoundsChecker(&_bounds, _btreeObj, 1 /* == order */));
}
}
Status SortStageKeyGenerator::getSortKey(const WorkingSetMember& member,
BSONObj* objOut) const {
BSONObj btreeKeyToUse;
Status btreeStatus = getBtreeKey(member.obj, &btreeKeyToUse);
if (!btreeStatus.isOK()) {
return btreeStatus;
}
if (!_sortHasMeta) {
*objOut = btreeKeyToUse;
return Status::OK();
}
BSONObjBuilder mergedKeyBob;
// Merge metadata into the key.
BSONObjIterator it(_rawSortSpec);
BSONObjIterator btreeIt(btreeKeyToUse);
while (it.more()) {
BSONElement elt = it.next();
if (elt.isNumber()) {
// Merge btree key elt.
mergedKeyBob.append(btreeIt.next());
}
else if (LiteParsedQuery::isTextScoreMeta(elt)) {
// Add text score metadata
double score = 0.0;
if (member.hasComputed(WSM_COMPUTED_TEXT_SCORE)) {
const TextScoreComputedData* scoreData
= static_cast(
member.getComputed(WSM_COMPUTED_TEXT_SCORE));
score = scoreData->getScore();
}
mergedKeyBob.append("$metaTextScore", score);
}
}
*objOut = mergedKeyBob.obj();
return Status::OK();
}
Status SortStageKeyGenerator::getBtreeKey(const BSONObj& memberObj, BSONObj* objOut) const {
// Not sorting by anything in the key, just bail out early.
if (_btreeObj.isEmpty()) {
*objOut = BSONObj();
return Status::OK();
}
// We will sort '_data' in the same order an index over '_pattern' would have. This is
// tricky. Consider the sort pattern {a:1} and the document {a:[1, 10]}. We have
// potentially two keys we could use to sort on. Here we extract these keys.
BSONObjCmp patternCmp(_btreeObj);
BSONObjSet keys(patternCmp);
try {
_keyGen->getKeys(memberObj, &keys);
}
catch (const UserException& e) {
// Probably a parallel array.
if (BtreeKeyGenerator::ParallelArraysCode == e.getCode()) {
return Status(ErrorCodes::BadValue,
"cannot sort with keys that are parallel arrays");
}
else {
return e.toStatus();
}
}
catch (...) {
return Status(ErrorCodes::InternalError, "unknown error during sort key generation");
}
// Key generator isn't sparse so we should at least get an all-null key.
invariant(!keys.empty());
// No bounds? No problem! Use the first key.
if (!_hasBounds) {
// Note that we sort 'keys' according to the pattern '_btreeObj'.
*objOut = *keys.begin();
return Status::OK();
}
// To decide which key to use in sorting, we must consider not only the sort pattern but
// the query. Assume we have the query {a: {$gte: 5}} and a document {a:1}. That
// document wouldn't match the query. As such, the key '1' in an array {a: [1, 10]}
// should not be considered as being part of the result set and thus that array cannot
// sort using the key '1'. To ensure that the keys we sort by are valid w.r.t. the
// query we use a bounds checker.
verify(NULL != _boundsChecker.get());
for (BSONObjSet::const_iterator it = keys.begin(); it != keys.end(); ++it) {
if (_boundsChecker->isValidKey(*it)) {
*objOut = *it;
return Status::OK();
}
}
// No key is in our bounds.
// TODO: will this ever happen? don't think it should.
*objOut = *keys.begin();
return Status::OK();
}
void SortStageKeyGenerator::getBoundsForSort(const BSONObj& queryObj, const BSONObj& sortObj) {
QueryPlannerParams params;
params.options = QueryPlannerParams::NO_TABLE_SCAN;
// We're creating a "virtual index" with key pattern equal to the sort order.
IndexEntry sortOrder(sortObj, IndexNames::BTREE, true, false, "doesnt_matter", BSONObj());
params.indices.push_back(sortOrder);
CanonicalQuery* rawQueryForSort;
verify(CanonicalQuery::canonicalize(
"fake_ns", queryObj, &rawQueryForSort, WhereCallbackNoop()).isOK());
auto_ptr queryForSort(rawQueryForSort);
vector solns;
QLOG() << "Sort stage: Planning to obtain bounds for sort." << endl;
QueryPlanner::plan(*queryForSort, params, &solns);
// TODO: are there ever > 1 solns? If so, do we look for a specific soln?
if (1 == solns.size()) {
IndexScanNode* ixScan = NULL;
QuerySolutionNode* rootNode = solns[0]->root.get();
if (rootNode->getType() == STAGE_FETCH) {
FetchNode* fetchNode = static_cast(rootNode);
if (fetchNode->children[0]->getType() != STAGE_IXSCAN) {
delete solns[0];
// No bounds.
return;
}
ixScan = static_cast(fetchNode->children[0]);
}
else if (rootNode->getType() == STAGE_IXSCAN) {
ixScan = static_cast(rootNode);
}
if (ixScan) {
_bounds.fields.swap(ixScan->bounds.fields);
_hasBounds = true;
}
}
for (size_t i = 0; i < solns.size(); ++i) {
delete solns[i];
}
}
SortStage::WorkingSetComparator::WorkingSetComparator(BSONObj p) : pattern(p) { }
bool SortStage::WorkingSetComparator::operator()(const SortableDataItem& lhs, const SortableDataItem& rhs) const {
// False means ignore field names.
int result = lhs.sortKey.woCompare(rhs.sortKey, pattern, false);
if (0 != result) {
return result < 0;
}
// Indices use DiskLoc as an additional sort key so we must as well.
return lhs.loc < rhs.loc;
}
SortStage::SortStage(const SortStageParams& params, WorkingSet* ws, PlanStage* child)
: _collection(params.collection),
_ws(ws),
_child(child),
_pattern(params.pattern),
_query(params.query),
_limit(params.limit),
_sorted(false),
_resultIterator(_data.end()),
_commonStats(kStageType),
_memUsage(0) {
}
SortStage::~SortStage() { }
bool SortStage::isEOF() {
// We're done when our child has no more results, we've sorted the child's results, and
// we've returned all sorted results.
return _child->isEOF() && _sorted && (_data.end() == _resultIterator);
}
PlanStage::StageState SortStage::work(WorkingSetID* out) {
++_commonStats.works;
// Adds the amount of time taken by work() to executionTimeMillis.
ScopedTimer timer(&_commonStats.executionTimeMillis);
if (NULL == _sortKeyGen) {
// This is heavy and should be done as part of work().
_sortKeyGen.reset(new SortStageKeyGenerator(_collection, _pattern, _query));
_sortKeyComparator.reset(new WorkingSetComparator(_sortKeyGen->getSortComparator()));
// If limit > 1, we need to initialize _dataSet here to maintain ordered
// set of data items while fetching from the child stage.
if (_limit > 1) {
const WorkingSetComparator& cmp = *_sortKeyComparator;
_dataSet.reset(new SortableDataItemSet(cmp));
}
return PlanStage::NEED_TIME;
}
if (_memUsage > kMaxBytes) {
mongoutils::str::stream ss;
ss << "sort stage buffered data usage of " << _memUsage
<< " bytes exceeds internal limit of " << kMaxBytes << " bytes";
Status status(ErrorCodes::Overflow, ss);
*out = WorkingSetCommon::allocateStatusMember( _ws, status);
return PlanStage::FAILURE;
}
if (isEOF()) { return PlanStage::IS_EOF; }
// Still reading in results to sort.
if (!_sorted) {
WorkingSetID id = WorkingSet::INVALID_ID;
StageState code = _child->work(&id);
if (PlanStage::ADVANCED == code) {
// Add it into the map for quick invalidation if it has a valid DiskLoc.
// A DiskLoc may be invalidated at any time (during a yield). We need to get into
// the WorkingSet as quickly as possible to handle it.
WorkingSetMember* member = _ws->get(id);
// Planner must put a fetch before we get here.
verify(member->hasObj());
// We might be sorting something that was invalidated at some point.
if (member->hasLoc()) {
_wsidByDiskLoc[member->loc] = id;
}
// The data remains in the WorkingSet and we wrap the WSID with the sort key.
SortableDataItem item;
Status sortKeyStatus = _sortKeyGen->getSortKey(*member, &item.sortKey);
if (!_sortKeyGen->getSortKey(*member, &item.sortKey).isOK()) {
*out = WorkingSetCommon::allocateStatusMember(_ws, sortKeyStatus);
return PlanStage::FAILURE;
}
item.wsid = id;
if (member->hasLoc()) {
// The DiskLoc breaks ties when sorting two WSMs with the same sort key.
item.loc = member->loc;
}
addToBuffer(item);
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else if (PlanStage::IS_EOF == code) {
// TODO: We don't need the lock for this. We could ask for a yield and do this work
// unlocked. Also, this is performing a lot of work for one call to work(...)
sortBuffer();
_resultIterator = _data.begin();
_sorted = true;
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else if (PlanStage::FAILURE == code) {
*out = id;
// If a stage fails, it may create a status WSM to indicate why it
// failed, in which case 'id' is valid. If ID is invalid, we
// create our own error message.
if (WorkingSet::INVALID_ID == id) {
mongoutils::str::stream ss;
ss << "sort stage failed to read in results to sort from child";
Status status(ErrorCodes::InternalError, ss);
*out = WorkingSetCommon::allocateStatusMember( _ws, status);
}
return code;
}
else {
if (PlanStage::NEED_TIME == code) {
++_commonStats.needTime;
}
return code;
}
}
// Returning results.
verify(_resultIterator != _data.end());
verify(_sorted);
*out = _resultIterator->wsid;
_resultIterator++;
// If we're returning something, take it out of our DL -> WSID map so that future
// calls to invalidate don't cause us to take action for a DL we're done with.
WorkingSetMember* member = _ws->get(*out);
if (member->hasLoc()) {
_wsidByDiskLoc.erase(member->loc);
}
++_commonStats.advanced;
return PlanStage::ADVANCED;
}
void SortStage::prepareToYield() {
++_commonStats.yields;
_child->prepareToYield();
}
void SortStage::recoverFromYield(OperationContext* opCtx) {
++_commonStats.unyields;
_child->recoverFromYield(opCtx);
}
void SortStage::invalidate(const DiskLoc& dl, InvalidationType type) {
++_commonStats.invalidates;
_child->invalidate(dl, type);
// If we have a deletion, we can fetch and carry on.
// If we have a mutation, it's easier to fetch and use the previous document.
// So, no matter what, fetch and keep the doc in play.
// _data contains indices into the WorkingSet, not actual data. If a WorkingSetMember in
// the WorkingSet needs to change state as a result of a DiskLoc invalidation, it will still
// be at the same spot in the WorkingSet. As such, we don't need to modify _data.
DataMap::iterator it = _wsidByDiskLoc.find(dl);
// If we're holding on to data that's got the DiskLoc we're invalidating...
if (_wsidByDiskLoc.end() != it) {
// Grab the WSM that we're nuking.
WorkingSetMember* member = _ws->get(it->second);
verify(member->loc == dl);
WorkingSetCommon::fetchAndInvalidateLoc(member, _collection);
// Remove the DiskLoc from our set of active DLs.
_wsidByDiskLoc.erase(it);
++_specificStats.forcedFetches;
}
}
vector SortStage::getChildren() const {
vector children;
children.push_back(_child.get());
return children;
}
PlanStageStats* SortStage::getStats() {
_commonStats.isEOF = isEOF();
_specificStats.memLimit = kMaxBytes;
_specificStats.memUsage = _memUsage;
_specificStats.limit = _limit;
_specificStats.sortPattern = _pattern.getOwned();
auto_ptr ret(new PlanStageStats(_commonStats, STAGE_SORT));
ret->specific.reset(new SortStats(_specificStats));
ret->children.push_back(_child->getStats());
return ret.release();
}
const CommonStats* SortStage::getCommonStats() {
return &_commonStats;
}
const SpecificStats* SortStage::getSpecificStats() {
return &_specificStats;
}
/**
* addToBuffer() and sortBuffer() work differently based on the
* configured limit. addToBuffer() is also responsible for
* performing some accounting on the overall memory usage to
* make sure we're not using too much memory.
*
* limit == 0:
* addToBuffer() - Adds item to vector.
* sortBuffer() - Sorts vector.
* limit == 1:
* addToBuffer() - Replaces first item in vector with max of
* current and new item.
* Updates memory usage if item was replaced.
* sortBuffer() - Does nothing.
* limit > 1:
* addToBuffer() - Does not update vector. Adds item to set.
* If size of set exceeds limit, remove item from set
* with lowest key. Updates memory usage accordingly.
* sortBuffer() - Copies items from set to vectors.
*/
void SortStage::addToBuffer(const SortableDataItem& item) {
// Holds ID of working set member to be freed at end of this function.
WorkingSetID wsidToFree = WorkingSet::INVALID_ID;
if (_limit == 0) {
_data.push_back(item);
_memUsage += _ws->get(item.wsid)->getMemUsage();
}
else if (_limit == 1) {
if (_data.empty()) {
_data.push_back(item);
_memUsage = _ws->get(item.wsid)->getMemUsage();
return;
}
wsidToFree = item.wsid;
const WorkingSetComparator& cmp = *_sortKeyComparator;
// Compare new item with existing item in vector.
if (cmp(item, _data[0])) {
wsidToFree = _data[0].wsid;
_data[0] = item;
_memUsage = _ws->get(item.wsid)->getMemUsage();
}
}
else {
// Update data item set instead of vector
// Limit not reached - insert and return
vector::size_type limit(_limit);
if (_dataSet->size() < limit) {
_dataSet->insert(item);
_memUsage += _ws->get(item.wsid)->getMemUsage();
return;
}
// Limit will be exceeded - compare with item with lowest key
// If new item does not have a lower key value than last item,
// do nothing.
wsidToFree = item.wsid;
SortableDataItemSet::const_iterator lastItemIt = --(_dataSet->end());
const SortableDataItem& lastItem = *lastItemIt;
const WorkingSetComparator& cmp = *_sortKeyComparator;
if (cmp(item, lastItem)) {
_memUsage -= _ws->get(lastItem.wsid)->getMemUsage();
_memUsage += _ws->get(item.wsid)->getMemUsage();
wsidToFree = lastItem.wsid;
// According to std::set iterator validity rules,
// it does not matter which of erase()/insert() happens first.
// Here, we choose to erase first to release potential resources
// used by the last item and to keep the scope of the iterator to a minimum.
_dataSet->erase(lastItemIt);
_dataSet->insert(item);
}
}
// If the working set ID is valid, remove from
// DiskLoc invalidation map and free from working set.
if (wsidToFree != WorkingSet::INVALID_ID) {
WorkingSetMember* member = _ws->get(wsidToFree);
if (member->hasLoc()) {
_wsidByDiskLoc.erase(member->loc);
}
_ws->free(wsidToFree);
}
}
void SortStage::sortBuffer() {
if (_limit == 0) {
const WorkingSetComparator& cmp = *_sortKeyComparator;
std::sort(_data.begin(), _data.end(), cmp);
}
else if (_limit == 1) {
// Buffer contains either 0 or 1 item so it is already in a sorted state.
return;
}
else {
// Set already contains items in sorted order, so we simply copy the items
// from the set to the vector.
// Release the memory for the set after the copy.
vector newData(_dataSet->begin(), _dataSet->end());
_data.swap(newData);
_dataSet.reset();
}
}
} // namespace mongo