/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/query/plan_executor.h"
#include
#include "mongo/db/catalog/collection.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
using boost::shared_ptr;
using std::string;
using std::vector;
namespace {
/**
* Retrieves the first stage of a given type from the plan tree, or NULL
* if no such stage is found.
*/
PlanStage* getStageByType(PlanStage* root, StageType type) {
if (root->stageType() == type) {
return root;
}
vector children = root->getChildren();
for (size_t i = 0; i < children.size(); i++) {
PlanStage* result = getStageByType(children[i], type);
if (result) {
return result;
}
}
return NULL;
}
}
// static
Status PlanExecutor::make(OperationContext* opCtx,
WorkingSet* ws,
PlanStage* rt,
const Collection* collection,
YieldPolicy yieldPolicy,
PlanExecutor** out) {
return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, collection, "", yieldPolicy, out);
}
// static
Status PlanExecutor::make(OperationContext* opCtx,
WorkingSet* ws,
PlanStage* rt,
const std::string& ns,
YieldPolicy yieldPolicy,
PlanExecutor** out) {
return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, NULL, ns, yieldPolicy, out);
}
// static
Status PlanExecutor::make(OperationContext* opCtx,
WorkingSet* ws,
PlanStage* rt,
CanonicalQuery* cq,
const Collection* collection,
YieldPolicy yieldPolicy,
PlanExecutor** out) {
return PlanExecutor::make(opCtx, ws, rt, NULL, cq, collection, "", yieldPolicy, out);
}
// static
Status PlanExecutor::make(OperationContext* opCtx,
WorkingSet* ws,
PlanStage* rt,
QuerySolution* qs,
CanonicalQuery* cq,
const Collection* collection,
YieldPolicy yieldPolicy,
PlanExecutor** out) {
return PlanExecutor::make(opCtx, ws, rt, qs, cq, collection, "", yieldPolicy, out);
}
// static
Status PlanExecutor::make(OperationContext* opCtx,
WorkingSet* ws,
PlanStage* rt,
QuerySolution* qs,
CanonicalQuery* cq,
const Collection* collection,
const std::string& ns,
YieldPolicy yieldPolicy,
PlanExecutor** out) {
std::auto_ptr exec(new PlanExecutor(opCtx, ws, rt, qs, cq, collection, ns));
// Perform plan selection, if necessary.
Status status = exec->pickBestPlan(yieldPolicy);
if (!status.isOK()) {
return status;
}
*out = exec.release();
return Status::OK();
}
PlanExecutor::PlanExecutor(OperationContext* opCtx,
WorkingSet* ws,
PlanStage* rt,
QuerySolution* qs,
CanonicalQuery* cq,
const Collection* collection,
const std::string& ns)
: _opCtx(opCtx),
_collection(collection),
_cq(cq),
_workingSet(ws),
_qs(qs),
_root(rt),
_ns(ns),
_killed(false),
_yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) {
// We may still need to initialize _ns from either _collection or _cq.
if (!_ns.empty()) {
// We already have an _ns set, so there's nothing more to do.
return;
}
if (NULL != _collection) {
_ns = _collection->ns().ns();
}
else {
invariant(NULL != _cq.get());
_ns = _cq->getParsed().ns();
}
}
Status PlanExecutor::pickBestPlan(YieldPolicy policy) {
// For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and
// register it to receive notifications.
this->setYieldPolicy(policy);
// First check if we need to do subplanning.
PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN);
if (foundStage) {
SubplanStage* subplan = static_cast(foundStage);
return subplan->pickBestPlan(_yieldPolicy.get());
}
// If we didn't have to do subplanning, we might still have to do regular
// multi plan selection.
foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN);
if (foundStage) {
MultiPlanStage* mps = static_cast(foundStage);
return mps->pickBestPlan(_yieldPolicy.get());
}
// Either we chose a plan, or no plan selection was required. In both cases,
// our work has been successfully completed.
return Status::OK();
}
PlanExecutor::~PlanExecutor() { }
// static
std::string PlanExecutor::statestr(ExecState s) {
if (PlanExecutor::ADVANCED == s) {
return "ADVANCED";
}
else if (PlanExecutor::IS_EOF == s) {
return "IS_EOF";
}
else if (PlanExecutor::DEAD == s) {
return "DEAD";
}
else {
verify(PlanExecutor::FAILURE == s);
return "FAILURE";
}
}
WorkingSet* PlanExecutor::getWorkingSet() const {
return _workingSet.get();
}
PlanStage* PlanExecutor::getRootStage() const {
return _root.get();
}
CanonicalQuery* PlanExecutor::getCanonicalQuery() const {
return _cq.get();
}
PlanStageStats* PlanExecutor::getStats() const {
return _root->getStats();
}
const Collection* PlanExecutor::collection() const {
return _collection;
}
OperationContext* PlanExecutor::getOpCtx() const {
return _opCtx;
}
void PlanExecutor::saveState() {
if (!_killed) {
_root->saveState();
}
// Doc-locking storage engines drop their transactional context after saving state.
// The query stages inside this stage tree might buffer record ids (e.g. text, geoNear,
// mergeSort, sort) which are no longer protected by the storage engine's transactional
// boundaries. Force-fetch the documents for any such record ids so that we have our
// own copy in the working set.
if (supportsDocLocking()) {
WorkingSetCommon::prepareForSnapshotChange(_workingSet.get());
}
_opCtx = NULL;
}
bool PlanExecutor::restoreState(OperationContext* opCtx) {
try {
return restoreStateWithoutRetrying(opCtx);
}
catch (const WriteConflictException& wce) {
if (!_yieldPolicy->allowedToYield())
throw;
// Handles retries by calling restoreStateWithoutRetrying() in a loop.
return _yieldPolicy->yield(NULL);
}
}
bool PlanExecutor::restoreStateWithoutRetrying(OperationContext* opCtx) {
invariant(NULL == _opCtx);
invariant(opCtx);
_opCtx = opCtx;
// We're restoring after a yield or getMore now. If we're a yielding plan executor, reset
// the yield timer in order to prevent from yielding again right away.
_yieldPolicy->resetTimer();
if (!_killed) {
_root->restoreState(opCtx);
}
return !_killed;
}
void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
if (!_killed) { _root->invalidate(txn, dl, type); }
}
PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) {
Snapshotted snapshotted;
ExecState state = getNextSnapshotted(objOut ? &snapshotted : NULL, dlOut);
if (objOut) {
*objOut = snapshotted.value();
}
return state;
}
PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted* objOut,
RecordId* dlOut) {
if (_killed) { return PlanExecutor::DEAD; }
// When a stage requests a yield for document fetch, it gives us back a RecordFetcher*
// to use to pull the record into memory. We take ownership of the RecordFetcher here,
// deleting it after we've had a chance to do the fetch. For timing-based yields, we
// just pass a NULL fetcher.
boost::scoped_ptr fetcher;
// Incremented on every writeConflict, reset to 0 on any successful call to _root->work.
size_t writeConflictsInARow = 0;
for (;;) {
// These are the conditions which can cause us to yield:
// 1) The yield policy's timer elapsed, or
// 2) some stage requested a yield due to a document fetch, or
// 3) we need to yield and retry due to a WriteConflictException.
// In all cases, the actual yielding happens here.
if (_yieldPolicy->shouldYield()) {
_yieldPolicy->yield(fetcher.get());
if (_killed) {
return PlanExecutor::DEAD;
}
}
// We're done using the fetcher, so it should be freed. We don't want to
// use the same RecordFetcher twice.
fetcher.reset();
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState code = _root->work(&id);
if (code != PlanStage::NEED_YIELD)
writeConflictsInARow = 0;
if (PlanStage::ADVANCED == code) {
// Fast count.
if (WorkingSet::INVALID_ID == id) {
invariant(NULL == objOut);
invariant(NULL == dlOut);
return PlanExecutor::ADVANCED;
}
WorkingSetMember* member = _workingSet->get(id);
bool hasRequestedData = true;
if (NULL != objOut) {
if (WorkingSetMember::LOC_AND_IDX == member->state) {
if (1 != member->keyData.size()) {
_workingSet->free(id);
hasRequestedData = false;
}
else {
// TODO: currently snapshot ids are only associated with documents, and
// not with index keys.
*objOut = Snapshotted(SnapshotId(),
member->keyData[0].keyData);
}
}
else if (member->hasObj()) {
*objOut = member->obj;
}
else {
_workingSet->free(id);
hasRequestedData = false;
}
}
if (NULL != dlOut) {
if (member->hasLoc()) {
*dlOut = member->loc;
}
else {
_workingSet->free(id);
hasRequestedData = false;
}
}
if (hasRequestedData) {
_workingSet->free(id);
return PlanExecutor::ADVANCED;
}
// This result didn't have the data the caller wanted, try again.
}
else if (PlanStage::NEED_YIELD == code) {
if (id == WorkingSet::INVALID_ID) {
if (!_yieldPolicy->allowedToYield()) throw WriteConflictException();
_opCtx->getCurOp()->debug().writeConflicts++;
writeConflictsInARow++;
WriteConflictException::logAndBackoff(writeConflictsInARow,
"plan execution",
_collection->ns().ns());
}
else {
WorkingSetMember* member = _workingSet->get(id);
invariant(member->hasFetcher());
// Transfer ownership of the fetcher. Next time around the loop a yield will
// happen.
fetcher.reset(member->releaseFetcher());
}
// If we're allowed to, we will yield next time through the loop.
if (_yieldPolicy->allowedToYield()) _yieldPolicy->forceYield();
}
else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
}
else if (PlanStage::IS_EOF == code) {
return PlanExecutor::IS_EOF;
}
else if (PlanStage::DEAD == code) {
return PlanExecutor::DEAD;
}
else {
verify(PlanStage::FAILURE == code);
if (NULL != objOut) {
BSONObj statusObj;
WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &statusObj);
*objOut = Snapshotted(SnapshotId(), statusObj);
}
return PlanExecutor::FAILURE;
}
}
}
bool PlanExecutor::isEOF() {
return _killed || _root->isEOF();
}
void PlanExecutor::registerExec() {
_safety.reset(new ScopedExecutorRegistration(this));
}
void PlanExecutor::deregisterExec() {
_safety.reset();
}
void PlanExecutor::kill() {
_killed = true;
_collection = NULL;
// XXX: PlanExecutor is designed to wrap a single execution tree. In the case of
// aggregation queries, PlanExecutor wraps a proxy stage responsible for pulling results
// from an aggregation pipeline. The aggregation pipeline pulls results from yet another
// PlanExecutor. Such nested PlanExecutors require us to manually propagate kill() to
// the "inner" executor. This is bad, and hopefully can be fixed down the line with the
// unification of agg and query.
//
// The CachedPlanStage is another special case. It needs to update the plan cache from
// its destructor. It needs to know whether it has been killed so that it can avoid
// touching a potentially invalid plan cache in this case.
//
// TODO: get rid of this code block.
{
PlanStage* foundStage = getStageByType(_root.get(), STAGE_PIPELINE_PROXY);
if (foundStage) {
PipelineProxyStage* proxyStage = static_cast(foundStage);
shared_ptr childExec = proxyStage->getChildExecutor();
if (childExec) {
childExec->kill();
}
}
foundStage = getStageByType(_root.get(), STAGE_CACHED_PLAN);
if (foundStage) {
CachedPlanStage* cacheStage = static_cast(foundStage);
cacheStage->kill();
}
}
}
Status PlanExecutor::executePlan() {
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
while (PlanExecutor::ADVANCED == state) {
state = this->getNext(&obj, NULL);
}
if (PlanExecutor::DEAD == state) {
return Status(ErrorCodes::OperationFailed, "Exec error: PlanExecutor killed");
}
else if (PlanExecutor::FAILURE == state) {
return Status(ErrorCodes::OperationFailed,
str::stream() << "Exec error: "
<< WorkingSetCommon::toStatusString(obj));
}
invariant(PlanExecutor::IS_EOF == state);
return Status::OK();
}
const string& PlanExecutor::ns() {
return _ns;
}
void PlanExecutor::setYieldPolicy(YieldPolicy policy, bool registerExecutor) {
_yieldPolicy->setPolicy(policy);
if (PlanExecutor::YIELD_AUTO == policy) {
// Runners that yield automatically generally need to be registered so that
// after yielding, they receive notifications of events like deletions and
// index drops. The only exception is that a few PlanExecutors get registered
// by ClientCursor instead of being registered here. This is unneeded if we only do
// partial "yields" for WriteConflict retrying.
if (registerExecutor) {
this->registerExec();
}
}
}
//
// ScopedExecutorRegistration
//
PlanExecutor::ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec)
: _exec(exec) {
// Collection can be null for an EOFStage plan, or other places where registration
// is not needed.
if (_exec->collection()) {
_exec->collection()->getCursorManager()->registerExecutor(exec);
}
}
PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() {
if (_exec->collection()) {
_exec->collection()->getCursorManager()->deregisterExecutor(_exec);
}
}
} // namespace mongo