diff options
Diffstat (limited to 'src/mongo/db/exec/cached_plan.cpp')
-rw-r--r-- | src/mongo/db/exec/cached_plan.cpp | 551 |
1 files changed, 265 insertions, 286 deletions
diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp index 3c512e01890..78894d28d35 100644 --- a/src/mongo/db/exec/cached_plan.cpp +++ b/src/mongo/db/exec/cached_plan.cpp @@ -49,338 +49,317 @@ namespace mongo { - // static - const char* CachedPlanStage::kStageType = "CACHED_PLAN"; - - CachedPlanStage::CachedPlanStage(OperationContext* txn, - Collection* collection, - WorkingSet* ws, - CanonicalQuery* cq, - const QueryPlannerParams& params, - size_t decisionWorks, - PlanStage* root) - : _txn(txn), - _collection(collection), - _ws(ws), - _canonicalQuery(cq), - _plannerParams(params), - _decisionWorks(decisionWorks), - _root(root), - _commonStats(kStageType) { - invariant(_collection); - } - - Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { - // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of - // execution work that happens here, so this is needed for the time accounting to - // make sense. - ScopedTimer timer(&_commonStats.executionTimeMillis); - - // If we work this many times during the trial period, then we will replan the - // query from scratch. - size_t maxWorksBeforeReplan = static_cast<size_t>(internalQueryCacheEvictionRatio - * _decisionWorks); - - // The trial period ends without replanning if the cached plan produces this many results. - size_t numResults = MultiPlanStage::getTrialPeriodNumToReturn(*_canonicalQuery); - - for (size_t i = 0; i < maxWorksBeforeReplan; ++i) { - // Might need to yield between calls to work due to the timer elapsing. - Status yieldStatus = tryYield(yieldPolicy); - if (!yieldStatus.isOK()) { - return yieldStatus; - } +// static +const char* CachedPlanStage::kStageType = "CACHED_PLAN"; + +CachedPlanStage::CachedPlanStage(OperationContext* txn, + Collection* collection, + WorkingSet* ws, + CanonicalQuery* cq, + const QueryPlannerParams& params, + size_t decisionWorks, + PlanStage* root) + : _txn(txn), + _collection(collection), + _ws(ws), + _canonicalQuery(cq), + _plannerParams(params), + _decisionWorks(decisionWorks), + _root(root), + _commonStats(kStageType) { + invariant(_collection); +} + +Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { + // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of + // execution work that happens here, so this is needed for the time accounting to + // make sense. + ScopedTimer timer(&_commonStats.executionTimeMillis); + + // If we work this many times during the trial period, then we will replan the + // query from scratch. + size_t maxWorksBeforeReplan = + static_cast<size_t>(internalQueryCacheEvictionRatio * _decisionWorks); + + // The trial period ends without replanning if the cached plan produces this many results. + size_t numResults = MultiPlanStage::getTrialPeriodNumToReturn(*_canonicalQuery); + + for (size_t i = 0; i < maxWorksBeforeReplan; ++i) { + // Might need to yield between calls to work due to the timer elapsing. + Status yieldStatus = tryYield(yieldPolicy); + if (!yieldStatus.isOK()) { + return yieldStatus; + } - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = _root->work(&id); + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = _root->work(&id); - if (PlanStage::ADVANCED == state) { - // Save result for later. - _results.push_back(id); + if (PlanStage::ADVANCED == state) { + // Save result for later. + _results.push_back(id); - if (_results.size() >= numResults) { - // Once a plan returns enough results, stop working. Update cache with stats - // from this run and return. - updatePlanCache(); - return Status::OK(); - } - } - else if (PlanStage::IS_EOF == state) { - // Cached plan hit EOF quickly enough. No need to replan. Update cache with stats + if (_results.size() >= numResults) { + // Once a plan returns enough results, stop working. Update cache with stats // from this run and return. updatePlanCache(); return Status::OK(); } - else if (PlanStage::NEED_YIELD == state) { - if (id == WorkingSet::INVALID_ID) { - if (!yieldPolicy->allowedToYield()) { - throw WriteConflictException(); - } - } - else { - WorkingSetMember* member = _ws->get(id); - invariant(member->hasFetcher()); - // Transfer ownership of the fetcher and yield. - _fetcher.reset(member->releaseFetcher()); - } - - if (yieldPolicy->allowedToYield()) { - yieldPolicy->forceYield(); - } - - Status yieldStatus = tryYield(yieldPolicy); - if (!yieldStatus.isOK()) { - return yieldStatus; + } else if (PlanStage::IS_EOF == state) { + // Cached plan hit EOF quickly enough. No need to replan. Update cache with stats + // from this run and return. + updatePlanCache(); + return Status::OK(); + } else if (PlanStage::NEED_YIELD == state) { + if (id == WorkingSet::INVALID_ID) { + if (!yieldPolicy->allowedToYield()) { + throw WriteConflictException(); } + } else { + WorkingSetMember* member = _ws->get(id); + invariant(member->hasFetcher()); + // Transfer ownership of the fetcher and yield. + _fetcher.reset(member->releaseFetcher()); } - else if (PlanStage::FAILURE == state) { - // On failure, fall back to replanning the whole query. We neither evict the - // existing cache entry nor cache the result of replanning. - BSONObj statusObj; - WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj); - - LOG(1) << "Execution of cached plan failed, falling back to replan." - << " query: " - << _canonicalQuery->toStringShort() - << " planSummary: " - << Explain::getPlanSummary(_root.get()) - << " status: " - << statusObj; - - const bool shouldCache = false; - return replan(yieldPolicy, shouldCache); - } - else if (PlanStage::DEAD == state) { - BSONObj statusObj; - WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj); - - LOG(1) << "Execution of cached plan failed: PlanStage died" - << ", query: " - << _canonicalQuery->toStringShort() - << " planSummary: " - << Explain::getPlanSummary(_root.get()) - << " status: " - << statusObj; - - return WorkingSetCommon::getMemberObjectStatus(statusObj); - } - else { - invariant(PlanStage::NEED_TIME == state); - } - } - // If we're here, the trial period took more than 'maxWorksBeforeReplan' work cycles. This - // plan is taking too long, so we replan from scratch. - LOG(1) << "Execution of cached plan required " - << maxWorksBeforeReplan - << " works, but was originally cached with only " - << _decisionWorks - << " works. Evicting cache entry and replanning query: " - << _canonicalQuery->toStringShort() - << " plan summary before replan: " - << Explain::getPlanSummary(_root.get()); - - const bool shouldCache = true; - return replan(yieldPolicy, shouldCache); - } + if (yieldPolicy->allowedToYield()) { + yieldPolicy->forceYield(); + } - Status CachedPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) { - // These are the conditions which can cause us to yield: - // 1) The yield policy's timer elapsed, or - // 2) some stage requested a yield due to a document fetch, or - // 3) we need to yield and retry due to a WriteConflictException. - // In all cases, the actual yielding happens here. - if (yieldPolicy->shouldYield()) { - // Here's where we yield. - bool alive = yieldPolicy->yield(_fetcher.get()); - - if (!alive) { - return Status(ErrorCodes::OperationFailed, - "CachedPlanStage killed during plan selection"); + Status yieldStatus = tryYield(yieldPolicy); + if (!yieldStatus.isOK()) { + return yieldStatus; } + } else if (PlanStage::FAILURE == state) { + // On failure, fall back to replanning the whole query. We neither evict the + // existing cache entry nor cache the result of replanning. + BSONObj statusObj; + WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj); + + LOG(1) << "Execution of cached plan failed, falling back to replan." + << " query: " << _canonicalQuery->toStringShort() + << " planSummary: " << Explain::getPlanSummary(_root.get()) + << " status: " << statusObj; + + const bool shouldCache = false; + return replan(yieldPolicy, shouldCache); + } else if (PlanStage::DEAD == state) { + BSONObj statusObj; + WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj); + + LOG(1) << "Execution of cached plan failed: PlanStage died" + << ", query: " << _canonicalQuery->toStringShort() + << " planSummary: " << Explain::getPlanSummary(_root.get()) + << " status: " << statusObj; + + return WorkingSetCommon::getMemberObjectStatus(statusObj); + } else { + invariant(PlanStage::NEED_TIME == state); } - - // We're done using the fetcher, so it should be freed. We don't want to - // use the same RecordFetcher twice. - _fetcher.reset(); - - return Status::OK(); } - Status CachedPlanStage::replan(PlanYieldPolicy* yieldPolicy, bool shouldCache) { - // We're going to start over with a new plan. No need for only old buffered results. - _results.clear(); - - // Clear out the working set. We'll start with a fresh working set. - _ws->clear(); - - // Use the query planning module to plan the whole query. - std::vector<QuerySolution*> rawSolutions; - Status status = QueryPlanner::plan(*_canonicalQuery, _plannerParams, &rawSolutions); - if (!status.isOK()) { - return Status(ErrorCodes::BadValue, - str::stream() - << "error processing query: " << _canonicalQuery->toString() - << " planner returned error: " << status.reason()); + // If we're here, the trial period took more than 'maxWorksBeforeReplan' work cycles. This + // plan is taking too long, so we replan from scratch. + LOG(1) << "Execution of cached plan required " << maxWorksBeforeReplan + << " works, but was originally cached with only " << _decisionWorks + << " works. Evicting cache entry and replanning query: " + << _canonicalQuery->toStringShort() + << " plan summary before replan: " << Explain::getPlanSummary(_root.get()); + + const bool shouldCache = true; + return replan(yieldPolicy, shouldCache); +} + +Status CachedPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) { + // These are the conditions which can cause us to yield: + // 1) The yield policy's timer elapsed, or + // 2) some stage requested a yield due to a document fetch, or + // 3) we need to yield and retry due to a WriteConflictException. + // In all cases, the actual yielding happens here. + if (yieldPolicy->shouldYield()) { + // Here's where we yield. + bool alive = yieldPolicy->yield(_fetcher.get()); + + if (!alive) { + return Status(ErrorCodes::OperationFailed, + "CachedPlanStage killed during plan selection"); } + } - OwnedPointerVector<QuerySolution> solutions(rawSolutions); + // We're done using the fetcher, so it should be freed. We don't want to + // use the same RecordFetcher twice. + _fetcher.reset(); - // We cannot figure out how to answer the query. Perhaps it requires an index - // we do not have? - if (0 == solutions.size()) { - return Status(ErrorCodes::BadValue, - str::stream() - << "error processing query: " - << _canonicalQuery->toString() - << " No query solutions"); - } + return Status::OK(); +} - if (1 == solutions.size()) { - // If there's only one solution, it won't get cached. Make sure to evict the existing - // cache entry if requested by the caller. - if (shouldCache) { - PlanCache* cache = _collection->infoCache()->getPlanCache(); - cache->remove(*_canonicalQuery); - } +Status CachedPlanStage::replan(PlanYieldPolicy* yieldPolicy, bool shouldCache) { + // We're going to start over with a new plan. No need for only old buffered results. + _results.clear(); - PlanStage* newRoot; - // Only one possible plan. Build the stages from the solution. - verify(StageBuilder::build(_txn, _collection, *solutions[0], _ws, &newRoot)); - _root.reset(newRoot); - _replannedQs.reset(solutions.popAndReleaseBack()); - return Status::OK(); - } + // Clear out the working set. We'll start with a fresh working set. + _ws->clear(); - // Many solutions. Create a MultiPlanStage to pick the best, update the cache, - // and so on. The working set will be shared by all candidate plans. - _root.reset(new MultiPlanStage(_txn, _collection, _canonicalQuery, shouldCache)); - MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_root.get()); + // Use the query planning module to plan the whole query. + std::vector<QuerySolution*> rawSolutions; + Status status = QueryPlanner::plan(*_canonicalQuery, _plannerParams, &rawSolutions); + if (!status.isOK()) { + return Status(ErrorCodes::BadValue, + str::stream() << "error processing query: " << _canonicalQuery->toString() + << " planner returned error: " << status.reason()); + } - for (size_t ix = 0; ix < solutions.size(); ++ix) { - if (solutions[ix]->cacheData.get()) { - solutions[ix]->cacheData->indexFilterApplied = _plannerParams.indexFiltersApplied; - } + OwnedPointerVector<QuerySolution> solutions(rawSolutions); - PlanStage* nextPlanRoot; - verify(StageBuilder::build(_txn, _collection, *solutions[ix], _ws, &nextPlanRoot)); + // We cannot figure out how to answer the query. Perhaps it requires an index + // we do not have? + if (0 == solutions.size()) { + return Status(ErrorCodes::BadValue, + str::stream() << "error processing query: " << _canonicalQuery->toString() + << " No query solutions"); + } - // Takes ownership of 'solutions[ix]' and 'nextPlanRoot'. - multiPlanStage->addPlan(solutions.releaseAt(ix), nextPlanRoot, _ws); + if (1 == solutions.size()) { + // If there's only one solution, it won't get cached. Make sure to evict the existing + // cache entry if requested by the caller. + if (shouldCache) { + PlanCache* cache = _collection->infoCache()->getPlanCache(); + cache->remove(*_canonicalQuery); } - // Delegate to the MultiPlanStage's plan selection facility. - return multiPlanStage->pickBestPlan(yieldPolicy); - } - - bool CachedPlanStage::isEOF() { - return _results.empty() && _root->isEOF(); + PlanStage* newRoot; + // Only one possible plan. Build the stages from the solution. + verify(StageBuilder::build(_txn, _collection, *solutions[0], _ws, &newRoot)); + _root.reset(newRoot); + _replannedQs.reset(solutions.popAndReleaseBack()); + return Status::OK(); } - PlanStage::StageState CachedPlanStage::work(WorkingSetID* out) { - ++_commonStats.works; - - // Adds the amount of time taken by work() to executionTimeMillis. - ScopedTimer timer(&_commonStats.executionTimeMillis); - - if (isEOF()) { return PlanStage::IS_EOF; } + // Many solutions. Create a MultiPlanStage to pick the best, update the cache, + // and so on. The working set will be shared by all candidate plans. + _root.reset(new MultiPlanStage(_txn, _collection, _canonicalQuery, shouldCache)); + MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_root.get()); - // First exhaust any results buffered during the trial period. - if (!_results.empty()) { - *out = _results.front(); - _results.pop_front(); - _commonStats.advanced++; - return PlanStage::ADVANCED; + for (size_t ix = 0; ix < solutions.size(); ++ix) { + if (solutions[ix]->cacheData.get()) { + solutions[ix]->cacheData->indexFilterApplied = _plannerParams.indexFiltersApplied; } - // Nothing left in trial period buffer. - StageState childStatus = _root->work(out); + PlanStage* nextPlanRoot; + verify(StageBuilder::build(_txn, _collection, *solutions[ix], _ws, &nextPlanRoot)); - if (PlanStage::ADVANCED == childStatus) { - _commonStats.advanced++; - } - else if (PlanStage::NEED_YIELD == childStatus) { - _commonStats.needYield++; - } - else if (PlanStage::NEED_TIME == childStatus) { - _commonStats.needTime++; - } - - return childStatus; + // Takes ownership of 'solutions[ix]' and 'nextPlanRoot'. + multiPlanStage->addPlan(solutions.releaseAt(ix), nextPlanRoot, _ws); } - void CachedPlanStage::saveState() { - _txn = NULL; - ++_commonStats.yields; - _root->saveState(); - } + // Delegate to the MultiPlanStage's plan selection facility. + return multiPlanStage->pickBestPlan(yieldPolicy); +} - void CachedPlanStage::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; +bool CachedPlanStage::isEOF() { + return _results.empty() && _root->isEOF(); +} - ++_commonStats.unyields; - _root->restoreState(opCtx); - } +PlanStage::StageState CachedPlanStage::work(WorkingSetID* out) { + ++_commonStats.works; - void CachedPlanStage::invalidate(OperationContext* txn, - const RecordId& dl, - InvalidationType type) { - _root->invalidate(txn, dl, type); - ++_commonStats.invalidates; - - for (std::list<WorkingSetID>::iterator it = _results.begin(); it != _results.end(); ) { - WorkingSetMember* member = _ws->get(*it); - if (member->hasLoc() && member->loc == dl) { - std::list<WorkingSetID>::iterator next = it; - ++next; - WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); - _results.erase(it); - it = next; - } - else { - ++it; - } - } - } + // Adds the amount of time taken by work() to executionTimeMillis. + ScopedTimer timer(&_commonStats.executionTimeMillis); - std::vector<PlanStage*> CachedPlanStage::getChildren() const { - return { _root.get() }; + if (isEOF()) { + return PlanStage::IS_EOF; } - PlanStageStats* CachedPlanStage::getStats() { - _commonStats.isEOF = isEOF(); - - std::unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_CACHED_PLAN)); - ret->specific.reset(new CachedPlanStats(_specificStats)); - ret->children.push_back(_root->getStats()); - - return ret.release(); + // First exhaust any results buffered during the trial period. + if (!_results.empty()) { + *out = _results.front(); + _results.pop_front(); + _commonStats.advanced++; + return PlanStage::ADVANCED; } - const CommonStats* CachedPlanStage::getCommonStats() const { - return &_commonStats; - } + // Nothing left in trial period buffer. + StageState childStatus = _root->work(out); - const SpecificStats* CachedPlanStage::getSpecificStats() const { - return &_specificStats; + if (PlanStage::ADVANCED == childStatus) { + _commonStats.advanced++; + } else if (PlanStage::NEED_YIELD == childStatus) { + _commonStats.needYield++; + } else if (PlanStage::NEED_TIME == childStatus) { + _commonStats.needTime++; } - void CachedPlanStage::updatePlanCache() { - std::unique_ptr<PlanCacheEntryFeedback> feedback(new PlanCacheEntryFeedback()); - feedback->stats.reset(getStats()); - feedback->score = PlanRanker::scoreTree(feedback->stats.get()); - - PlanCache* cache = _collection->infoCache()->getPlanCache(); - Status fbs = cache->feedback(*_canonicalQuery, feedback.release()); - if (!fbs.isOK()) { - LOG(5) << _canonicalQuery->ns() << ": Failed to update cache with feedback: " - << fbs.toString() << " - " - << "(query: " << _canonicalQuery->getQueryObj() - << "; sort: " << _canonicalQuery->getParsed().getSort() - << "; projection: " << _canonicalQuery->getParsed().getProj() - << ") is no longer in plan cache."; + return childStatus; +} + +void CachedPlanStage::saveState() { + _txn = NULL; + ++_commonStats.yields; + _root->saveState(); +} + +void CachedPlanStage::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + + ++_commonStats.unyields; + _root->restoreState(opCtx); +} + +void CachedPlanStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { + _root->invalidate(txn, dl, type); + ++_commonStats.invalidates; + + for (std::list<WorkingSetID>::iterator it = _results.begin(); it != _results.end();) { + WorkingSetMember* member = _ws->get(*it); + if (member->hasLoc() && member->loc == dl) { + std::list<WorkingSetID>::iterator next = it; + ++next; + WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); + _results.erase(it); + it = next; + } else { + ++it; } } +} + +std::vector<PlanStage*> CachedPlanStage::getChildren() const { + return {_root.get()}; +} + +PlanStageStats* CachedPlanStage::getStats() { + _commonStats.isEOF = isEOF(); + + std::unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_CACHED_PLAN)); + ret->specific.reset(new CachedPlanStats(_specificStats)); + ret->children.push_back(_root->getStats()); + + return ret.release(); +} + +const CommonStats* CachedPlanStage::getCommonStats() const { + return &_commonStats; +} + +const SpecificStats* CachedPlanStage::getSpecificStats() const { + return &_specificStats; +} + +void CachedPlanStage::updatePlanCache() { + std::unique_ptr<PlanCacheEntryFeedback> feedback(new PlanCacheEntryFeedback()); + feedback->stats.reset(getStats()); + feedback->score = PlanRanker::scoreTree(feedback->stats.get()); + + PlanCache* cache = _collection->infoCache()->getPlanCache(); + Status fbs = cache->feedback(*_canonicalQuery, feedback.release()); + if (!fbs.isOK()) { + LOG(5) << _canonicalQuery->ns() + << ": Failed to update cache with feedback: " << fbs.toString() << " - " + << "(query: " << _canonicalQuery->getQueryObj() + << "; sort: " << _canonicalQuery->getParsed().getSort() + << "; projection: " << _canonicalQuery->getParsed().getProj() + << ") is no longer in plan cache."; + } +} } // namespace mongo |