/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/query/plan_executor.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" #include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/stacktrace.h" namespace mongo { using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; using stdx::make_unique; namespace { namespace { MONGO_FP_DECLARE(planExecutorAlwaysDead); } // namespace /** * Retrieves the first stage of a given type from the plan tree, or NULL * if no such stage is found. */ PlanStage* getStageByType(PlanStage* root, StageType type) { if (root->stageType() == type) { return root; } const auto& children = root->getChildren(); for (size_t i = 0; i < children.size(); i++) { PlanStage* result = getStageByType(children[i].get(), type); if (result) { return result; } } return NULL; } } // static StatusWith> PlanExecutor::make(OperationContext* opCtx, unique_ptr ws, unique_ptr rt, const Collection* collection, YieldPolicy yieldPolicy) { return PlanExecutor::make( opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, "", yieldPolicy); } // static StatusWith> PlanExecutor::make(OperationContext* opCtx, unique_ptr ws, unique_ptr rt, const string& ns, YieldPolicy yieldPolicy) { return PlanExecutor::make( opCtx, std::move(ws), std::move(rt), nullptr, nullptr, nullptr, ns, yieldPolicy); } // static StatusWith> PlanExecutor::make(OperationContext* opCtx, unique_ptr ws, unique_ptr rt, unique_ptr cq, const Collection* collection, YieldPolicy yieldPolicy) { return PlanExecutor::make( opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, "", yieldPolicy); } // static StatusWith> PlanExecutor::make(OperationContext* opCtx, unique_ptr ws, unique_ptr rt, unique_ptr qs, unique_ptr cq, const Collection* collection, YieldPolicy yieldPolicy) { return PlanExecutor::make(opCtx, std::move(ws), std::move(rt), std::move(qs), std::move(cq), collection, "", yieldPolicy); } // static StatusWith> PlanExecutor::make(OperationContext* opCtx, unique_ptr ws, unique_ptr rt, unique_ptr qs, unique_ptr cq, const Collection* collection, const string& ns, YieldPolicy yieldPolicy) { unique_ptr exec(new PlanExecutor( opCtx, std::move(ws), std::move(rt), std::move(qs), std::move(cq), collection, ns)); // Perform plan selection, if necessary. Status status = exec->pickBestPlan(yieldPolicy, collection); if (!status.isOK()) { return status; } return std::move(exec); } PlanExecutor::PlanExecutor(OperationContext* opCtx, unique_ptr ws, unique_ptr rt, unique_ptr qs, unique_ptr cq, const Collection* collection, const string& ns) : _opCtx(opCtx), _cq(std::move(cq)), _workingSet(std::move(ws)), _qs(std::move(qs)), _root(std::move(rt)), _ns(ns), _yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) { // We may still need to initialize _ns from either collection or _cq. if (!_ns.empty()) { // We already have an _ns set, so there's nothing more to do. return; } if (collection) { _ns = collection->ns().ns(); } else { invariant(_cq); _ns = _cq->getQueryRequest().ns(); } } Status PlanExecutor::pickBestPlan(YieldPolicy policy, const Collection* collection) { invariant(_currentState == kUsable); // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and // register it to receive notifications. this->setYieldPolicy(policy, collection); // First check if we need to do subplanning. PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN); if (foundStage) { SubplanStage* subplan = static_cast(foundStage); return subplan->pickBestPlan(_yieldPolicy.get()); } // If we didn't have to do subplanning, we might still have to do regular // multi plan selection... foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN); if (foundStage) { MultiPlanStage* mps = static_cast(foundStage); return mps->pickBestPlan(_yieldPolicy.get()); } // ...or, we might have to run a plan from the cache for a trial period, falling back on // regular planning if the cached plan performs poorly. foundStage = getStageByType(_root.get(), STAGE_CACHED_PLAN); if (foundStage) { CachedPlanStage* cachedPlan = static_cast(foundStage); return cachedPlan->pickBestPlan(_yieldPolicy.get()); } // Either we chose a plan, or no plan selection was required. In both cases, // our work has been successfully completed. return Status::OK(); } PlanExecutor::~PlanExecutor() {} // static string PlanExecutor::statestr(ExecState s) { if (PlanExecutor::ADVANCED == s) { return "ADVANCED"; } else if (PlanExecutor::IS_EOF == s) { return "IS_EOF"; } else if (PlanExecutor::DEAD == s) { return "DEAD"; } else { verify(PlanExecutor::FAILURE == s); return "FAILURE"; } } WorkingSet* PlanExecutor::getWorkingSet() const { return _workingSet.get(); } PlanStage* PlanExecutor::getRootStage() const { return _root.get(); } CanonicalQuery* PlanExecutor::getCanonicalQuery() const { return _cq.get(); } unique_ptr PlanExecutor::getStats() const { return _root->getStats(); } BSONObjSet PlanExecutor::getOutputSorts() const { if (_qs && _qs->root) { _qs->root->computeProperties(); return _qs->root->getSort(); } if (_root->stageType() == STAGE_MULTI_PLAN) { // If we needed a MultiPlanStage, the PlanExecutor does not own the QuerySolution. We // must go through the MultiPlanStage to access the output sort. auto multiPlanStage = static_cast(_root.get()); if (multiPlanStage->bestSolution()) { multiPlanStage->bestSolution()->root->computeProperties(); return multiPlanStage->bestSolution()->root->getSort(); } } else if (_root->stageType() == STAGE_SUBPLAN) { auto subplanStage = static_cast(_root.get()); if (subplanStage->compositeSolution()) { subplanStage->compositeSolution()->root->computeProperties(); return subplanStage->compositeSolution()->root->getSort(); } } return SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } OperationContext* PlanExecutor::getOpCtx() const { return _opCtx; } void PlanExecutor::saveState() { invariant(_currentState == kUsable || _currentState == kSaved); // The query stages inside this stage tree might buffer record ids (e.g. text, geoNear, // mergeSort, sort) which are no longer protected by the storage engine's transactional // boundaries. WorkingSetCommon::prepareForSnapshotChange(_workingSet.get()); if (!killed()) { _root->saveState(); } _currentState = kSaved; } bool PlanExecutor::restoreState() { try { return restoreStateWithoutRetrying(); } catch (const WriteConflictException& wce) { if (!_yieldPolicy->allowedToYield()) throw; // Handles retries by calling restoreStateWithoutRetrying() in a loop. return _yieldPolicy->yield(); } } bool PlanExecutor::restoreStateWithoutRetrying() { invariant(_currentState == kSaved); if (!killed()) { _root->restoreState(); } _currentState = kUsable; return !killed(); } void PlanExecutor::detachFromOperationContext() { invariant(_currentState == kSaved); _opCtx = nullptr; _root->detachFromOperationContext(); _currentState = kDetached; _everDetachedFromOperationContext = true; } void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) { invariant(_currentState == kDetached); // We're reattaching for a getMore now. Reset the yield timer in order to prevent from // yielding again right away. _yieldPolicy->resetTimer(); _opCtx = opCtx; _root->reattachToOperationContext(opCtx); _currentState = kSaved; } void PlanExecutor::invalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { if (!killed()) { _root->invalidate(opCtx, dl, type); } } PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) { Snapshotted snapshotted; ExecState state = getNextImpl(objOut ? &snapshotted : NULL, dlOut); if (objOut) { *objOut = snapshotted.value(); } return state; } PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted* objOut, RecordId* dlOut) { // Detaching from the OperationContext means that the returned snapshot ids could be invalid. invariant(!_everDetachedFromOperationContext); return getNextImpl(objOut, dlOut); } PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted* objOut, RecordId* dlOut) { MONGO_FAIL_POINT_BLOCK(planExecutorAlwaysDead, customKill) { const BSONObj& data = customKill.getData(); BSONElement customKillNS = data["namespace"]; if (!customKillNS || _ns == customKillNS.str()) { deregisterExec(); kill("hit planExecutorAlwaysDead fail point"); } } invariant(_currentState == kUsable); if (killed()) { if (NULL != objOut) { Status status(ErrorCodes::OperationFailed, str::stream() << "Operation aborted because: " << *_killReason); *objOut = Snapshotted(SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); } return PlanExecutor::DEAD; } if (!_stash.empty()) { invariant(objOut && !dlOut); *objOut = {SnapshotId(), _stash.front()}; _stash.pop(); return PlanExecutor::ADVANCED; } // When a stage requests a yield for document fetch, it gives us back a RecordFetcher* // to use to pull the record into memory. We take ownership of the RecordFetcher here, // deleting it after we've had a chance to do the fetch. For timing-based yields, we // just pass a NULL fetcher. unique_ptr fetcher; // Incremented on every writeConflict, reset to 0 on any successful call to _root->work. size_t writeConflictsInARow = 0; for (;;) { // These are the conditions which can cause us to yield: // 1) The yield policy's timer elapsed, or // 2) some stage requested a yield due to a document fetch, or // 3) we need to yield and retry due to a WriteConflictException. // In all cases, the actual yielding happens here. if (_yieldPolicy->shouldYield()) { if (!_yieldPolicy->yield(fetcher.get())) { // A return of false from a yield should only happen if we've been killed during the // yield. invariant(killed()); if (NULL != objOut) { Status status(ErrorCodes::OperationFailed, str::stream() << "Operation aborted because: " << *_killReason); *objOut = Snapshotted( SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); } return PlanExecutor::DEAD; } } // We're done using the fetcher, so it should be freed. We don't want to // use the same RecordFetcher twice. fetcher.reset(); WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState code = _root->work(&id); if (code != PlanStage::NEED_YIELD) writeConflictsInARow = 0; if (PlanStage::ADVANCED == code) { WorkingSetMember* member = _workingSet->get(id); bool hasRequestedData = true; if (NULL != objOut) { if (WorkingSetMember::RID_AND_IDX == member->getState()) { if (1 != member->keyData.size()) { _workingSet->free(id); hasRequestedData = false; } else { // TODO: currently snapshot ids are only associated with documents, and // not with index keys. *objOut = Snapshotted(SnapshotId(), member->keyData[0].keyData); } } else if (member->hasObj()) { *objOut = member->obj; } else { _workingSet->free(id); hasRequestedData = false; } } if (NULL != dlOut) { if (member->hasRecordId()) { *dlOut = member->recordId; } else { _workingSet->free(id); hasRequestedData = false; } } if (hasRequestedData) { _workingSet->free(id); return PlanExecutor::ADVANCED; } // This result didn't have the data the caller wanted, try again. } else if (PlanStage::NEED_YIELD == code) { if (id == WorkingSet::INVALID_ID) { if (!_yieldPolicy->allowedToYield()) throw WriteConflictException(); CurOp::get(_opCtx)->debug().writeConflicts++; writeConflictsInARow++; WriteConflictException::logAndBackoff(writeConflictsInARow, "plan execution", _ns); } else { WorkingSetMember* member = _workingSet->get(id); invariant(member->hasFetcher()); // Transfer ownership of the fetcher. Next time around the loop a yield will // happen. fetcher.reset(member->releaseFetcher()); } // If we're allowed to, we will yield next time through the loop. if (_yieldPolicy->allowedToYield()) _yieldPolicy->forceYield(); } else if (PlanStage::NEED_TIME == code) { // Fall through to yield check at end of large conditional. } else if (PlanStage::IS_EOF == code) { return PlanExecutor::IS_EOF; } else { invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); if (NULL != objOut) { BSONObj statusObj; WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &statusObj); *objOut = Snapshotted(SnapshotId(), statusObj); } return (PlanStage::DEAD == code) ? PlanExecutor::DEAD : PlanExecutor::FAILURE; } } } bool PlanExecutor::isEOF() { invariant(_currentState == kUsable); return killed() || (_stash.empty() && _root->isEOF()); } void PlanExecutor::registerExec(const Collection* collection) { // There's no need to register a PlanExecutor for which the underlying collection // doesn't exist. if (collection) { _safety.reset(new ScopedExecutorRegistration(this, collection)); } } void PlanExecutor::deregisterExec() { _safety.reset(); } void PlanExecutor::kill(string reason) { _killReason = std::move(reason); } Status PlanExecutor::executePlan() { invariant(_currentState == kUsable); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; while (PlanExecutor::ADVANCED == state) { state = this->getNext(&obj, NULL); } if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) { if (killed()) { return Status(ErrorCodes::QueryPlanKilled, str::stream() << "Operation aborted because: " << *_killReason); } return Status(ErrorCodes::OperationFailed, str::stream() << "Exec error: " << WorkingSetCommon::toStatusString(obj) << ", state: " << PlanExecutor::statestr(state)); } invariant(!killed()); invariant(PlanExecutor::IS_EOF == state); return Status::OK(); } const string& PlanExecutor::ns() { return _ns; } void PlanExecutor::setYieldPolicy(YieldPolicy policy, const Collection* collection, bool registerExecutor) { if (!collection) { // If the collection doesn't exist, then there's no need to yield at all. invariant(!_yieldPolicy->allowedToYield()); return; } _yieldPolicy->setPolicy(policy); if (PlanExecutor::YIELD_AUTO == policy) { // Runners that yield automatically generally need to be registered so that // after yielding, they receive notifications of events like deletions and // index drops. The only exception is that a few PlanExecutors get registered // by ClientCursor instead of being registered here. This is unneeded if we only do // partial "yields" for WriteConflict retrying. if (registerExecutor) { this->registerExec(collection); } } } void PlanExecutor::enqueue(const BSONObj& obj) { _stash.push(obj.getOwned()); } // // ScopedExecutorRegistration // // PlanExecutor::ScopedExecutorRegistration PlanExecutor::ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec, const Collection* collection) : _exec(exec), _collection(collection) { invariant(_collection); _collection->getCursorManager()->registerExecutor(_exec); } PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() { if (_exec->killed()) { // If the plan executor has been killed, then it's possible that the collection // no longer exists. return; } _collection->getCursorManager()->deregisterExecutor(_exec); } } // namespace mongo