/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/exec/and_hash.h"
#include "mongo/db/exec/and_common-inl.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/util/mongoutils/str.h"
namespace {
// Upper limit for buffered data.
// Stage execution will fail once size of all buffered data exceeds this threshold.
const size_t kDefaultMaxMemUsageBytes = 32 * 1024 * 1024;
} // namespace
namespace mongo {
using std::unique_ptr;
using std::vector;
const size_t AndHashStage::kLookAheadWorks = 10;
// static
const char* AndHashStage::kStageType = "AND_HASH";
AndHashStage::AndHashStage(WorkingSet* ws, const Collection* collection)
: _collection(collection),
_ws(ws),
_hashingChildren(true),
_currentChild(0),
_commonStats(kStageType),
_memUsage(0),
_maxMemUsage(kDefaultMaxMemUsageBytes) {}
AndHashStage::AndHashStage(WorkingSet* ws,
const Collection* collection,
size_t maxMemUsage)
: _collection(collection),
_ws(ws),
_hashingChildren(true),
_currentChild(0),
_commonStats(kStageType),
_memUsage(0),
_maxMemUsage(maxMemUsage) {}
AndHashStage::~AndHashStage() {
for (size_t i = 0; i < _children.size(); ++i) { delete _children[i]; }
}
void AndHashStage::addChild(PlanStage* child) { _children.push_back(child); }
size_t AndHashStage::getMemUsage() const {
return _memUsage;
}
bool AndHashStage::isEOF() {
// This is empty before calling work() and not-empty after.
if (_lookAheadResults.empty()) { return false; }
// Either we're busy hashing children, in which case we're not done yet.
if (_hashingChildren) { return false; }
// Or we're streaming in results from the last child.
// If there's nothing to probe against, we're EOF.
if (_dataMap.empty()) { return true; }
// Otherwise, we're done when the last child is done.
invariant(_children.size() >= 2);
return (WorkingSet::INVALID_ID == _lookAheadResults[_children.size() - 1])
&& _children[_children.size() - 1]->isEOF();
}
PlanStage::StageState AndHashStage::work(WorkingSetID* out) {
++_commonStats.works;
// Adds the amount of time taken by work() to executionTimeMillis.
ScopedTimer timer(&_commonStats.executionTimeMillis);
if (isEOF()) { return PlanStage::IS_EOF; }
// Fast-path for one of our children being EOF immediately. We work each child a few times.
// If it hits EOF, the AND cannot output anything. If it produces a result, we stash that
// result in _lookAheadResults.
if (_lookAheadResults.empty()) {
// INVALID_ID means that the child didn't produce a valid result.
// We specifically are not using .resize(size, value) here because C++11 builds don't
// seem to resolve WorkingSet::INVALID_ID during linking.
_lookAheadResults.resize(_children.size());
for (size_t i = 0; i < _children.size(); ++i) {
_lookAheadResults[i] = WorkingSet::INVALID_ID;
}
// Work each child some number of times until it's either EOF or produces
// a result. If it's EOF this whole stage will be EOF. If it produces a
// result we cache it for later.
for (size_t i = 0; i < _children.size(); ++i) {
PlanStage* child = _children[i];
for (size_t j = 0; j < kLookAheadWorks; ++j) {
StageState childStatus = child->work(&_lookAheadResults[i]);
if (PlanStage::IS_EOF == childStatus) {
// A child went right to EOF. Bail out.
_hashingChildren = false;
_dataMap.clear();
return PlanStage::IS_EOF;
}
else if (PlanStage::ADVANCED == childStatus) {
// We have a result cached in _lookAheadResults[i]. Stop looking at this
// child.
break;
}
else if (PlanStage::FAILURE == childStatus || PlanStage::DEAD == childStatus) {
// Propage error to parent.
*out = _lookAheadResults[i];
// If a stage fails, it may create a status WSM to indicate why it
// failed, in which case 'id' is valid. If ID is invalid, we
// create our own error message.
if (WorkingSet::INVALID_ID == *out) {
mongoutils::str::stream ss;
ss << "hashed AND stage failed to read in look ahead results "
<< "from child " << i
<< ", childStatus: " << PlanStage::stateStr(childStatus);
Status status(ErrorCodes::InternalError, ss);
*out = WorkingSetCommon::allocateStatusMember( _ws, status);
}
_hashingChildren = false;
_dataMap.clear();
return childStatus;
}
// We ignore NEED_TIME. TODO: what do we want to do if we get NEED_YIELD here?
}
}
// We did a bunch of work above, return NEED_TIME to be fair.
return PlanStage::NEED_TIME;
}
// An AND is either reading the first child into the hash table, probing against the hash
// table with subsequent children, or checking the last child's results to see if they're
// in the hash table.
// We read the first child into our hash table.
if (_hashingChildren) {
// Check memory usage of previously hashed results.
if (_memUsage > _maxMemUsage) {
mongoutils::str::stream ss;
ss << "hashed AND stage buffered data usage of " << _memUsage
<< " bytes exceeds internal limit of " << kDefaultMaxMemUsageBytes << " bytes";
Status status(ErrorCodes::Overflow, ss);
*out = WorkingSetCommon::allocateStatusMember( _ws, status);
return PlanStage::FAILURE;
}
if (0 == _currentChild) {
return readFirstChild(out);
}
else if (_currentChild < _children.size() - 1) {
return hashOtherChildren(out);
}
else {
_hashingChildren = false;
// We don't hash our last child. Instead, we probe the table created from the
// previous children, returning results in the order of the last child.
// Fall through to below.
}
}
// Returning results. We read from the last child and return the results that are in our
// hash map.
// We should be EOF if we're not hashing results and the dataMap is empty.
verify(!_dataMap.empty());
// We probe _dataMap with the last child.
verify(_currentChild == _children.size() - 1);
// Get the next result for the (_children.size() - 1)-th child.
StageState childStatus = workChild(_children.size() - 1, out);
if (PlanStage::ADVANCED != childStatus) {
return childStatus;
}
// We know that we've ADVANCED. See if the WSM is in our table.
WorkingSetMember* member = _ws->get(*out);
// Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything
// with this WSM.
if (!member->hasLoc()) {
_ws->flagForReview(*out);
return PlanStage::NEED_TIME;
}
DataMap::iterator it = _dataMap.find(member->loc);
if (_dataMap.end() == it) {
// Child's output wasn't in every previous child. Throw it out.
_ws->free(*out);
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else {
// Child's output was in every previous child. Merge any key data in
// the child's output and free the child's just-outputted WSM.
WorkingSetID hashID = it->second;
_dataMap.erase(it);
WorkingSetMember* olderMember = _ws->get(hashID);
AndCommon::mergeFrom(olderMember, *member);
_ws->free(*out);
++_commonStats.advanced;
*out = hashID;
return PlanStage::ADVANCED;
}
}
PlanStage::StageState AndHashStage::workChild(size_t childNo, WorkingSetID* out) {
if (WorkingSet::INVALID_ID != _lookAheadResults[childNo]) {
*out = _lookAheadResults[childNo];
_lookAheadResults[childNo] = WorkingSet::INVALID_ID;
return PlanStage::ADVANCED;
}
else {
return _children[childNo]->work(out);
}
}
PlanStage::StageState AndHashStage::readFirstChild(WorkingSetID* out) {
verify(_currentChild == 0);
WorkingSetID id = WorkingSet::INVALID_ID;
StageState childStatus = workChild(0, &id);
if (PlanStage::ADVANCED == childStatus) {
WorkingSetMember* member = _ws->get(id);
// Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything
// with this WSM.
if (!member->hasLoc()) {
_ws->flagForReview(id);
return PlanStage::NEED_TIME;
}
if (!_dataMap.insert(std::make_pair(member->loc, id)).second) {
// Didn't insert because we already had this loc inside the map. This should only
// happen if we're seeing a newer copy of the same doc in a more recent snapshot.
// Throw out the newer copy of the doc.
_ws->free(id);
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
// Update memory stats.
_memUsage += member->getMemUsage();
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else if (PlanStage::IS_EOF == childStatus) {
// Done reading child 0.
_currentChild = 1;
// If our first child was empty, don't scan any others, no possible results.
if (_dataMap.empty()) {
_hashingChildren = false;
return PlanStage::IS_EOF;
}
++_commonStats.needTime;
_specificStats.mapAfterChild.push_back(_dataMap.size());
return PlanStage::NEED_TIME;
}
else if (PlanStage::FAILURE == childStatus || PlanStage::DEAD == childStatus) {
*out = id;
// If a stage fails, it may create a status WSM to indicate why it
// failed, in which case 'id' is valid. If ID is invalid, we
// create our own error message.
if (WorkingSet::INVALID_ID == id) {
mongoutils::str::stream ss;
ss << "hashed AND stage failed to read in results to from first child";
Status status(ErrorCodes::InternalError, ss);
*out = WorkingSetCommon::allocateStatusMember( _ws, status);
}
return childStatus;
}
else {
if (PlanStage::NEED_TIME == childStatus) {
++_commonStats.needTime;
}
else if (PlanStage::NEED_YIELD == childStatus) {
++_commonStats.needYield;
*out = id;
}
return childStatus;
}
}
PlanStage::StageState AndHashStage::hashOtherChildren(WorkingSetID* out) {
verify(_currentChild > 0);
WorkingSetID id = WorkingSet::INVALID_ID;
StageState childStatus = workChild(_currentChild, &id);
if (PlanStage::ADVANCED == childStatus) {
WorkingSetMember* member = _ws->get(id);
// Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything
// with this WSM.
if (!member->hasLoc()) {
_ws->flagForReview(id);
return PlanStage::NEED_TIME;
}
verify(member->hasLoc());
if (_dataMap.end() == _dataMap.find(member->loc)) {
// Ignore. It's not in any previous child.
}
else {
// We have a hit. Copy data into the WSM we already have.
_seenMap.insert(member->loc);
WorkingSetMember* olderMember = _ws->get(_dataMap[member->loc]);
size_t memUsageBefore = olderMember->getMemUsage();
AndCommon::mergeFrom(olderMember, *member);
// Update memory stats.
_memUsage += olderMember->getMemUsage() - memUsageBefore;
}
_ws->free(id);
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else if (PlanStage::IS_EOF == childStatus) {
// Finished with a child.
++_currentChild;
// Keep elements of _dataMap that are in _seenMap.
DataMap::iterator it = _dataMap.begin();
while (it != _dataMap.end()) {
if (_seenMap.end() == _seenMap.find(it->first)) {
DataMap::iterator toErase = it;
++it;
// Update memory stats.
WorkingSetMember* member = _ws->get(toErase->second);
_memUsage -= member->getMemUsage();
_ws->free(toErase->second);
_dataMap.erase(toErase);
}
else { ++it; }
}
_specificStats.mapAfterChild.push_back(_dataMap.size());
_seenMap.clear();
// _dataMap is now the intersection of the first _currentChild nodes.
// If we have nothing to AND with after finishing any child, stop.
if (_dataMap.empty()) {
_hashingChildren = false;
return PlanStage::IS_EOF;
}
// We've finished scanning all children. Return results with the next call to work().
if (_currentChild == _children.size()) {
_hashingChildren = false;
}
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
else if (PlanStage::FAILURE == childStatus || PlanStage::DEAD == childStatus) {
*out = id;
// If a stage fails, it may create a status WSM to indicate why it
// failed, in which case 'id' is valid. If ID is invalid, we
// create our own error message.
if (WorkingSet::INVALID_ID == id) {
mongoutils::str::stream ss;
ss << "hashed AND stage failed to read in results from other child "
<< _currentChild;
Status status(ErrorCodes::InternalError, ss);
*out = WorkingSetCommon::allocateStatusMember( _ws, status);
}
return childStatus;
}
else {
if (PlanStage::NEED_TIME == childStatus) {
++_commonStats.needTime;
}
else if (PlanStage::NEED_YIELD == childStatus) {
++_commonStats.needYield;
*out = id;
}
return childStatus;
}
}
void AndHashStage::saveState() {
++_commonStats.yields;
for (size_t i = 0; i < _children.size(); ++i) {
_children[i]->saveState();
}
}
void AndHashStage::restoreState(OperationContext* opCtx) {
++_commonStats.unyields;
for (size_t i = 0; i < _children.size(); ++i) {
_children[i]->restoreState(opCtx);
}
}
void AndHashStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
++_commonStats.invalidates;
if (isEOF()) { return; }
for (size_t i = 0; i < _children.size(); ++i) {
_children[i]->invalidate(txn, dl, type);
}
// Invalidation can happen to our warmup results. If that occurs just
// flag it and forget about it.
for (size_t i = 0; i < _lookAheadResults.size(); ++i) {
if (WorkingSet::INVALID_ID != _lookAheadResults[i]) {
WorkingSetMember* member = _ws->get(_lookAheadResults[i]);
if (member->hasLoc() && member->loc == dl) {
WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
_ws->flagForReview(_lookAheadResults[i]);
_lookAheadResults[i] = WorkingSet::INVALID_ID;
}
}
}
// If it's a deletion, we have to forget about the RecordId, and since the AND-ing is by
// RecordId we can't continue processing it even with the object.
//
// If it's a mutation the predicates implied by the AND-ing may no longer be true.
//
// So, we flag and try to pick it up later.
DataMap::iterator it = _dataMap.find(dl);
if (_dataMap.end() != it) {
WorkingSetID id = it->second;
WorkingSetMember* member = _ws->get(id);
verify(member->loc == dl);
if (_hashingChildren) {
++_specificStats.flaggedInProgress;
}
else {
++_specificStats.flaggedButPassed;
}
// Update memory stats.
_memUsage -= member->getMemUsage();
// The loc is about to be invalidated. Fetch it and clear the loc.
WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
// Add the WSID to the to-be-reviewed list in the WS.
_ws->flagForReview(id);
// And don't return it from this stage.
_dataMap.erase(it);
}
}
vector AndHashStage::getChildren() const {
return _children;
}
PlanStageStats* AndHashStage::getStats() {
_commonStats.isEOF = isEOF();
_specificStats.memLimit = _maxMemUsage;
_specificStats.memUsage = _memUsage;
unique_ptr ret(new PlanStageStats(_commonStats, STAGE_AND_HASH));
ret->specific.reset(new AndHashStats(_specificStats));
for (size_t i = 0; i < _children.size(); ++i) {
ret->children.push_back(_children[i]->getStats());
}
return ret.release();
}
const CommonStats* AndHashStage::getCommonStats() const {
return &_commonStats;
}
const SpecificStats* AndHashStage::getSpecificStats() const {
return &_specificStats;
}
} // namespace mongo