diff options
author | Benety Goh <benety@mongodb.com> | 2018-10-25 13:37:44 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-10-25 13:37:44 -0400 |
commit | 7d8df4c73934ef757e4007743ab6b69e3e80b7c2 (patch) | |
tree | c2711ed6a864b24c7c4c0de1b8c5347a87388244 /src/mongo/db/query/plan_executor.cpp | |
parent | b9db97a9ec5644c00db39a02813e82daa814b692 (diff) | |
download | mongo-7d8df4c73934ef757e4007743ab6b69e3e80b7c2.tar.gz |
SERVER-37589 split PlanExecutor into interface and implementation
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 723 |
1 files changed, 0 insertions, 723 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp deleted file mode 100644 index 111fa4b376f..00000000000 --- a/src/mongo/db/query/plan_executor.cpp +++ /dev/null @@ -1,723 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - -#include "mongo/platform/basic.h" - -#include "mongo/db/query/plan_executor.h" - -#include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" -#include "mongo/db/exec/cached_plan.h" -#include "mongo/db/exec/collection_scan.h" -#include "mongo/db/exec/multi_plan.h" -#include "mongo/db/exec/pipeline_proxy.h" -#include "mongo/db/exec/plan_stage.h" -#include "mongo/db/exec/plan_stats.h" -#include "mongo/db/exec/subplan.h" -#include "mongo/db/exec/working_set.h" -#include "mongo/db/exec/working_set_common.h" -#include "mongo/db/query/find_common.h" -#include "mongo/db/query/mock_yield_policies.h" -#include "mongo/db/query/plan_yield_policy.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/service_context.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/stacktrace.h" - -namespace mongo { - -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using std::vector; - -const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime = - OperationContext::declareDecoration<repl::OpTime>(); - -struct CappedInsertNotifierData { - shared_ptr<CappedInsertNotifier> notifier; - uint64_t lastEOFVersion = ~0; -}; - -namespace { - -MONGO_FAIL_POINT_DEFINE(planExecutorAlwaysFails); -MONGO_FAIL_POINT_DEFINE(planExecutorHangBeforeShouldWaitForInserts); - -/** - * Constructs a PlanYieldPolicy based on 'policy'. - */ -std::unique_ptr<PlanYieldPolicy> makeYieldPolicy(PlanExecutor* exec, - PlanExecutor::YieldPolicy policy) { - switch (policy) { - case PlanExecutor::YieldPolicy::YIELD_AUTO: - case PlanExecutor::YieldPolicy::YIELD_MANUAL: - case PlanExecutor::YieldPolicy::NO_YIELD: - case PlanExecutor::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY: - case PlanExecutor::YieldPolicy::INTERRUPT_ONLY: { - return stdx::make_unique<PlanYieldPolicy>(exec, policy); - } - case PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT: { - return stdx::make_unique<AlwaysTimeOutYieldPolicy>(exec); - } - case PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED: { - return stdx::make_unique<AlwaysPlanKilledYieldPolicy>(exec); - } - default: - MONGO_UNREACHABLE; - } -} - -/** - * Retrieves the first stage of a given type from the plan tree, or NULL - * if no such stage is found. - */ -PlanStage* getStageByType(PlanStage* root, StageType type) { - if (root->stageType() == type) { - return root; - } - - const auto& children = root->getChildren(); - for (size_t i = 0; i < children.size(); i++) { - PlanStage* result = getStageByType(children[i].get(), type); - if (result) { - return result; - } - } - - return NULL; -} -} // namespace - -// static -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - const Collection* collection, - YieldPolicy yieldPolicy) { - return PlanExecutor::make( - opCtx, std::move(ws), std::move(rt), nullptr, nullptr, collection, {}, yieldPolicy); -} - -// static -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - NamespaceString nss, - YieldPolicy yieldPolicy) { - return PlanExecutor::make(opCtx, - std::move(ws), - std::move(rt), - nullptr, - nullptr, - nullptr, - std::move(nss), - yieldPolicy); -} - -// static -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - YieldPolicy yieldPolicy) { - return PlanExecutor::make( - opCtx, std::move(ws), std::move(rt), nullptr, std::move(cq), collection, {}, yieldPolicy); -} - -// static -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - YieldPolicy yieldPolicy) { - return PlanExecutor::make(opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - collection, - {}, - yieldPolicy); -} - -// static -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy) { - - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec( - new PlanExecutor(opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - collection, - std::move(nss), - yieldPolicy), - PlanExecutor::Deleter(opCtx, collection)); - - // Perform plan selection, if necessary. - Status status = exec->pickBestPlan(collection); - if (!status.isOK()) { - return status; - } - - return std::move(exec); -} - -PlanExecutor::PlanExecutor(OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const Collection* collection, - NamespaceString nss, - YieldPolicy yieldPolicy) - : _opCtx(opCtx), - _cq(std::move(cq)), - _workingSet(std::move(ws)), - _qs(std::move(qs)), - _root(std::move(rt)), - _nss(std::move(nss)), - // There's no point in yielding if the collection doesn't exist. - _yieldPolicy(makeYieldPolicy(this, collection ? yieldPolicy : NO_YIELD)) { - // We may still need to initialize _nss from either collection or _cq. - if (!_nss.isEmpty()) { - return; // We already have an _nss set, so there's nothing more to do. - } - - if (collection) { - _nss = collection->ns(); - if (_yieldPolicy->canReleaseLocksDuringExecution()) { - _registrationToken = collection->getCursorManager()->registerExecutor(this); - } - } else { - invariant(_cq); - _nss = _cq->getQueryRequest().nss(); - } -} - -Status PlanExecutor::pickBestPlan(const Collection* collection) { - invariant(_currentState == kUsable); - - // First check if we need to do subplanning. - PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN); - if (foundStage) { - SubplanStage* subplan = static_cast<SubplanStage*>(foundStage); - return subplan->pickBestPlan(_yieldPolicy.get()); - } - - // If we didn't have to do subplanning, we might still have to do regular - // multi plan selection... - foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN); - if (foundStage) { - MultiPlanStage* mps = static_cast<MultiPlanStage*>(foundStage); - return mps->pickBestPlan(_yieldPolicy.get()); - } - - // ...or, we might have to run a plan from the cache for a trial period, falling back on - // regular planning if the cached plan performs poorly. - foundStage = getStageByType(_root.get(), STAGE_CACHED_PLAN); - if (foundStage) { - CachedPlanStage* cachedPlan = static_cast<CachedPlanStage*>(foundStage); - return cachedPlan->pickBestPlan(_yieldPolicy.get()); - } - - // Either we chose a plan, or no plan selection was required. In both cases, - // our work has been successfully completed. - return Status::OK(); -} - -PlanExecutor::~PlanExecutor() { - invariant(_currentState == kDisposed); -} - -// static -string PlanExecutor::statestr(ExecState s) { - if (PlanExecutor::ADVANCED == s) { - return "ADVANCED"; - } else if (PlanExecutor::IS_EOF == s) { - return "IS_EOF"; - } else if (PlanExecutor::DEAD == s) { - return "DEAD"; - } else { - verify(PlanExecutor::FAILURE == s); - return "FAILURE"; - } -} - -WorkingSet* PlanExecutor::getWorkingSet() const { - return _workingSet.get(); -} - -PlanStage* PlanExecutor::getRootStage() const { - return _root.get(); -} - -CanonicalQuery* PlanExecutor::getCanonicalQuery() const { - return _cq.get(); -} - -unique_ptr<PlanStageStats> PlanExecutor::getStats() const { - return _root->getStats(); -} - -BSONObjSet PlanExecutor::getOutputSorts() const { - if (_qs && _qs->root) { - _qs->root->computeProperties(); - return _qs->root->getSort(); - } - - if (_root->stageType() == STAGE_MULTI_PLAN) { - // If we needed a MultiPlanStage, the PlanExecutor does not own the QuerySolution. We - // must go through the MultiPlanStage to access the output sort. - auto multiPlanStage = static_cast<MultiPlanStage*>(_root.get()); - if (multiPlanStage->bestSolution()) { - multiPlanStage->bestSolution()->root->computeProperties(); - return multiPlanStage->bestSolution()->root->getSort(); - } - } else if (_root->stageType() == STAGE_SUBPLAN) { - auto subplanStage = static_cast<SubplanStage*>(_root.get()); - if (subplanStage->compositeSolution()) { - subplanStage->compositeSolution()->root->computeProperties(); - return subplanStage->compositeSolution()->root->getSort(); - } - } - - return SimpleBSONObjComparator::kInstance.makeBSONObjSet(); -} - -OperationContext* PlanExecutor::getOpCtx() const { - return _opCtx; -} - -void PlanExecutor::saveState() { - invariant(_currentState == kUsable || _currentState == kSaved); - - // The query stages inside this stage tree might buffer record ids (e.g. text, geoNear, - // mergeSort, sort) which are no longer protected by the storage engine's transactional - // boundaries. - WorkingSetCommon::prepareForSnapshotChange(_workingSet.get()); - - if (!isMarkedAsKilled()) { - _root->saveState(); - } - _currentState = kSaved; -} - -Status PlanExecutor::restoreState() { - try { - return restoreStateWithoutRetrying(); - } catch (const WriteConflictException&) { - if (!_yieldPolicy->canAutoYield()) - throw; - - // Handles retries by calling restoreStateWithoutRetrying() in a loop. - return _yieldPolicy->yieldOrInterrupt(); - } -} - -Status PlanExecutor::restoreStateWithoutRetrying() { - invariant(_currentState == kSaved); - - if (!isMarkedAsKilled()) { - _root->restoreState(); - } - - _currentState = kUsable; - return _killStatus; -} - -void PlanExecutor::detachFromOperationContext() { - invariant(_currentState == kSaved); - _opCtx = nullptr; - _root->detachFromOperationContext(); - _currentState = kDetached; - _everDetachedFromOperationContext = true; -} - -void PlanExecutor::reattachToOperationContext(OperationContext* opCtx) { - invariant(_currentState == kDetached); - - // We're reattaching for a getMore now. Reset the yield timer in order to prevent from - // yielding again right away. - _yieldPolicy->resetTimer(); - - _opCtx = opCtx; - _root->reattachToOperationContext(opCtx); - _currentState = kSaved; -} - -PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) { - Snapshotted<BSONObj> snapshotted; - ExecState state = getNextImpl(objOut ? &snapshotted : NULL, dlOut); - - if (objOut) { - *objOut = snapshotted.value(); - } - - return state; -} - -PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut, - RecordId* dlOut) { - // Detaching from the OperationContext means that the returned snapshot ids could be invalid. - invariant(!_everDetachedFromOperationContext); - return getNextImpl(objOut, dlOut); -} - -bool PlanExecutor::shouldListenForInserts() { - return _cq && _cq->getQueryRequest().isTailableAndAwaitData() && - awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && - awaitDataState(_opCtx).waitForInsertsDeadline > - _opCtx->getServiceContext()->getPreciseClockSource()->now(); -} - -bool PlanExecutor::shouldWaitForInserts() { - // If this is an awaitData-respecting operation and we have time left and we're not interrupted, - // we should wait for inserts. - if (shouldListenForInserts()) { - // We expect awaitData cursors to be yielding. - invariant(_yieldPolicy->canReleaseLocksDuringExecution()); - - // For operations with a last committed opTime, we should not wait if the replication - // coordinator's lastCommittedOpTime has progressed past the client's lastCommittedOpTime. - // In that case, we will return early so that we can inform the client of the new - // lastCommittedOpTime immediately. - if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) { - auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - return clientsLastKnownCommittedOpTime(_opCtx) >= replCoord->getLastCommittedOpTime(); - } - return true; - } - return false; -} - -std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() { - // We don't expect to need a capped insert notifier for non-yielding plans. - invariant(_yieldPolicy->canReleaseLocksDuringExecution()); - - // We can only wait if we have a collection; otherwise we should retry immediately when - // we hit EOF. - dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); - auto db = DatabaseHolder::getDatabaseHolder().get(_opCtx, _nss.db()); - invariant(db); - auto collection = db->getCollection(_opCtx, _nss); - invariant(collection); - - return collection->getCappedInsertNotifier(); -} - -PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData, - Snapshotted<BSONObj>* errorObj) { - invariant(notifierData->notifier); - - // The notifier wait() method will not wait unless the version passed to it matches the - // current version of the notifier. Since the version passed to it is the current version - // of the notifier at the time of the previous EOF, we require two EOFs in a row with no - // notifier version change in order to wait. This is sufficient to ensure we never wait - // when data is available. - auto curOp = CurOp::get(_opCtx); - curOp->pauseTimer(); - ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - auto opCtx = _opCtx; - uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - auto yieldResult = _yieldPolicy->yieldOrInterrupt([opCtx, notifierData] { - const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); - }); - notifierData->lastEOFVersion = currentNotifierVersion; - - if (yieldResult.isOK()) { - // There may be more results, try to get more data. - return ADVANCED; - } - - if (errorObj) { - *errorObj = Snapshotted<BSONObj>(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(yieldResult)); - } - return DEAD; -} - -PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { - if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) { - Status status(ErrorCodes::InternalError, - str::stream() << "PlanExecutor hit planExecutorAlwaysFails fail point"); - *objOut = - Snapshotted<BSONObj>(SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status)); - - return PlanExecutor::FAILURE; - } - - invariant(_currentState == kUsable); - if (isMarkedAsKilled()) { - if (NULL != objOut) { - *objOut = Snapshotted<BSONObj>(SnapshotId(), - WorkingSetCommon::buildMemberStatusObject(_killStatus)); - } - return PlanExecutor::DEAD; - } - - if (!_stash.empty()) { - invariant(objOut && !dlOut); - *objOut = {SnapshotId(), _stash.front()}; - _stash.pop(); - return PlanExecutor::ADVANCED; - } - - // Incremented on every writeConflict, reset to 0 on any successful call to _root->work. - size_t writeConflictsInARow = 0; - - // Capped insert data; declared outside the loop so we hold a shared pointer to the capped - // insert notifier the entire time we are in the loop. Holding a shared pointer to the capped - // insert notifier is necessary for the notifierVersion to advance. - CappedInsertNotifierData cappedInsertNotifierData; - if (shouldListenForInserts()) { - // We always construct the CappedInsertNotifier for awaitData cursors. - cappedInsertNotifierData.notifier = getCappedInsertNotifier(); - } - for (;;) { - // These are the conditions which can cause us to yield: - // 1) The yield policy's timer elapsed, or - // 2) some stage requested a yield, or - // 3) we need to yield and retry due to a WriteConflictException. - // In all cases, the actual yielding happens here. - if (_yieldPolicy->shouldYieldOrInterrupt()) { - auto yieldStatus = _yieldPolicy->yieldOrInterrupt(); - if (!yieldStatus.isOK()) { - if (objOut) { - *objOut = Snapshotted<BSONObj>( - SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus)); - } - return PlanExecutor::DEAD; - } - } - - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState code = _root->work(&id); - - if (code != PlanStage::NEED_YIELD) - writeConflictsInARow = 0; - - if (PlanStage::ADVANCED == code) { - WorkingSetMember* member = _workingSet->get(id); - bool hasRequestedData = true; - - if (NULL != objOut) { - if (WorkingSetMember::RID_AND_IDX == member->getState()) { - if (1 != member->keyData.size()) { - _workingSet->free(id); - hasRequestedData = false; - } else { - // TODO: currently snapshot ids are only associated with documents, and - // not with index keys. - *objOut = Snapshotted<BSONObj>(SnapshotId(), member->keyData[0].keyData); - } - } else if (member->hasObj()) { - *objOut = member->obj; - } else { - _workingSet->free(id); - hasRequestedData = false; - } - } - - if (NULL != dlOut) { - if (member->hasRecordId()) { - *dlOut = member->recordId; - } else { - _workingSet->free(id); - hasRequestedData = false; - } - } - - if (hasRequestedData) { - _workingSet->free(id); - return PlanExecutor::ADVANCED; - } - // This result didn't have the data the caller wanted, try again. - } else if (PlanStage::NEED_YIELD == code) { - invariant(id == WorkingSet::INVALID_ID); - if (!_yieldPolicy->canAutoYield()) { - throw WriteConflictException(); - } - - CurOp::get(_opCtx)->debug().additiveMetrics.incrementWriteConflicts(1); - writeConflictsInARow++; - WriteConflictException::logAndBackoff( - writeConflictsInARow, "plan execution", _nss.ns()); - - // If we're allowed to, we will yield next time through the loop. - if (_yieldPolicy->canAutoYield()) { - _yieldPolicy->forceYield(); - } - } else if (PlanStage::NEED_TIME == code) { - // Fall through to yield check at end of large conditional. - } else if (PlanStage::IS_EOF == code) { - if (MONGO_FAIL_POINT(planExecutorHangBeforeShouldWaitForInserts)) { - log() << "PlanExecutor - planExecutorHangBeforeShouldWaitForInserts fail point " - "enabled. Blocking until fail point is disabled."; - MONGO_FAIL_POINT_PAUSE_WHILE_SET(planExecutorHangBeforeShouldWaitForInserts); - } - if (!shouldWaitForInserts()) { - return PlanExecutor::IS_EOF; - } - const ExecState waitResult = waitForInserts(&cappedInsertNotifierData, objOut); - if (waitResult == PlanExecutor::ADVANCED) { - // There may be more results, keep going. - continue; - } - return waitResult; - } else { - invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code); - - if (NULL != objOut) { - BSONObj statusObj; - invariant(WorkingSet::INVALID_ID != id); - WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &statusObj); - *objOut = Snapshotted<BSONObj>(SnapshotId(), statusObj); - } - - return (PlanStage::DEAD == code) ? PlanExecutor::DEAD : PlanExecutor::FAILURE; - } - } -} - -bool PlanExecutor::isEOF() { - invariant(_currentState == kUsable); - return isMarkedAsKilled() || (_stash.empty() && _root->isEOF()); -} - -void PlanExecutor::markAsKilled(Status killStatus) { - invariant(!killStatus.isOK()); - // If killed multiple times, only retain the first status. - if (_killStatus.isOK()) { - _killStatus = killStatus; - } -} - -void PlanExecutor::dispose(OperationContext* opCtx, CursorManager* cursorManager) { - if (_currentState == kDisposed) { - return; - } - - // If we are registered with the CursorManager we need to be sure to deregister ourselves. - // However, if we have been killed we should not attempt to deregister ourselves, since the - // caller of markAsKilled() will have done that already, and the CursorManager may no longer - // exist. Note that the caller's collection lock prevents us from being marked as killed during - // this method, since any interruption event requires a lock in at least MODE_IX. - if (cursorManager && _registrationToken && !isMarkedAsKilled()) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS)); - cursorManager->deregisterExecutor(this); - } - _root->dispose(opCtx); - _currentState = kDisposed; -} - -Status PlanExecutor::executePlan() { - invariant(_currentState == kUsable); - BSONObj obj; - PlanExecutor::ExecState state = PlanExecutor::ADVANCED; - while (PlanExecutor::ADVANCED == state) { - state = this->getNext(&obj, NULL); - } - - if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) { - if (isMarkedAsKilled()) { - return _killStatus; - } - - auto errorStatus = WorkingSetCommon::getMemberObjectStatus(obj); - invariant(!errorStatus.isOK()); - return errorStatus.withContext(str::stream() << "Exec error resulting in state " - << PlanExecutor::statestr(state)); - } - - invariant(!isMarkedAsKilled()); - invariant(PlanExecutor::IS_EOF == state); - return Status::OK(); -} - - -void PlanExecutor::enqueue(const BSONObj& obj) { - _stash.push(obj.getOwned()); -} - -Timestamp PlanExecutor::getLatestOplogTimestamp() { - if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY)) - return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp(); - if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) - return static_cast<CollectionScan*>(collectionScan)->getLatestOplogTimestamp(); - return Timestamp(); -} - -// -// PlanExecutor::Deleter -// - -PlanExecutor::Deleter::Deleter(OperationContext* opCtx, const Collection* collection) - : _opCtx(opCtx), _cursorManager(collection ? collection->getCursorManager() : nullptr) {} - -void PlanExecutor::Deleter::operator()(PlanExecutor* execPtr) { - try { - invariant(_opCtx); // It is illegal to invoke operator() on a default constructed Deleter. - if (!_dismissed) { - execPtr->dispose(_opCtx, _cursorManager); - } - delete execPtr; - } catch (...) { - std::terminate(); - } -} - -} // namespace mongo |