summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/cached_plan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/cached_plan.cpp')
-rw-r--r--src/mongo/db/exec/cached_plan.cpp551
1 files changed, 265 insertions, 286 deletions
diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp
index 3c512e01890..78894d28d35 100644
--- a/src/mongo/db/exec/cached_plan.cpp
+++ b/src/mongo/db/exec/cached_plan.cpp
@@ -49,338 +49,317 @@
namespace mongo {
- // static
- const char* CachedPlanStage::kStageType = "CACHED_PLAN";
-
- CachedPlanStage::CachedPlanStage(OperationContext* txn,
- Collection* collection,
- WorkingSet* ws,
- CanonicalQuery* cq,
- const QueryPlannerParams& params,
- size_t decisionWorks,
- PlanStage* root)
- : _txn(txn),
- _collection(collection),
- _ws(ws),
- _canonicalQuery(cq),
- _plannerParams(params),
- _decisionWorks(decisionWorks),
- _root(root),
- _commonStats(kStageType) {
- invariant(_collection);
- }
-
- Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
- // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
- // execution work that happens here, so this is needed for the time accounting to
- // make sense.
- ScopedTimer timer(&_commonStats.executionTimeMillis);
-
- // If we work this many times during the trial period, then we will replan the
- // query from scratch.
- size_t maxWorksBeforeReplan = static_cast<size_t>(internalQueryCacheEvictionRatio
- * _decisionWorks);
-
- // The trial period ends without replanning if the cached plan produces this many results.
- size_t numResults = MultiPlanStage::getTrialPeriodNumToReturn(*_canonicalQuery);
-
- for (size_t i = 0; i < maxWorksBeforeReplan; ++i) {
- // Might need to yield between calls to work due to the timer elapsing.
- Status yieldStatus = tryYield(yieldPolicy);
- if (!yieldStatus.isOK()) {
- return yieldStatus;
- }
+// static
+const char* CachedPlanStage::kStageType = "CACHED_PLAN";
+
+CachedPlanStage::CachedPlanStage(OperationContext* txn,
+ Collection* collection,
+ WorkingSet* ws,
+ CanonicalQuery* cq,
+ const QueryPlannerParams& params,
+ size_t decisionWorks,
+ PlanStage* root)
+ : _txn(txn),
+ _collection(collection),
+ _ws(ws),
+ _canonicalQuery(cq),
+ _plannerParams(params),
+ _decisionWorks(decisionWorks),
+ _root(root),
+ _commonStats(kStageType) {
+ invariant(_collection);
+}
+
+Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
+ // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
+ // execution work that happens here, so this is needed for the time accounting to
+ // make sense.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ // If we work this many times during the trial period, then we will replan the
+ // query from scratch.
+ size_t maxWorksBeforeReplan =
+ static_cast<size_t>(internalQueryCacheEvictionRatio * _decisionWorks);
+
+ // The trial period ends without replanning if the cached plan produces this many results.
+ size_t numResults = MultiPlanStage::getTrialPeriodNumToReturn(*_canonicalQuery);
+
+ for (size_t i = 0; i < maxWorksBeforeReplan; ++i) {
+ // Might need to yield between calls to work due to the timer elapsing.
+ Status yieldStatus = tryYield(yieldPolicy);
+ if (!yieldStatus.isOK()) {
+ return yieldStatus;
+ }
- WorkingSetID id = WorkingSet::INVALID_ID;
- PlanStage::StageState state = _root->work(&id);
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = _root->work(&id);
- if (PlanStage::ADVANCED == state) {
- // Save result for later.
- _results.push_back(id);
+ if (PlanStage::ADVANCED == state) {
+ // Save result for later.
+ _results.push_back(id);
- if (_results.size() >= numResults) {
- // Once a plan returns enough results, stop working. Update cache with stats
- // from this run and return.
- updatePlanCache();
- return Status::OK();
- }
- }
- else if (PlanStage::IS_EOF == state) {
- // Cached plan hit EOF quickly enough. No need to replan. Update cache with stats
+ if (_results.size() >= numResults) {
+ // Once a plan returns enough results, stop working. Update cache with stats
// from this run and return.
updatePlanCache();
return Status::OK();
}
- else if (PlanStage::NEED_YIELD == state) {
- if (id == WorkingSet::INVALID_ID) {
- if (!yieldPolicy->allowedToYield()) {
- throw WriteConflictException();
- }
- }
- else {
- WorkingSetMember* member = _ws->get(id);
- invariant(member->hasFetcher());
- // Transfer ownership of the fetcher and yield.
- _fetcher.reset(member->releaseFetcher());
- }
-
- if (yieldPolicy->allowedToYield()) {
- yieldPolicy->forceYield();
- }
-
- Status yieldStatus = tryYield(yieldPolicy);
- if (!yieldStatus.isOK()) {
- return yieldStatus;
+ } else if (PlanStage::IS_EOF == state) {
+ // Cached plan hit EOF quickly enough. No need to replan. Update cache with stats
+ // from this run and return.
+ updatePlanCache();
+ return Status::OK();
+ } else if (PlanStage::NEED_YIELD == state) {
+ if (id == WorkingSet::INVALID_ID) {
+ if (!yieldPolicy->allowedToYield()) {
+ throw WriteConflictException();
}
+ } else {
+ WorkingSetMember* member = _ws->get(id);
+ invariant(member->hasFetcher());
+ // Transfer ownership of the fetcher and yield.
+ _fetcher.reset(member->releaseFetcher());
}
- else if (PlanStage::FAILURE == state) {
- // On failure, fall back to replanning the whole query. We neither evict the
- // existing cache entry nor cache the result of replanning.
- BSONObj statusObj;
- WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj);
-
- LOG(1) << "Execution of cached plan failed, falling back to replan."
- << " query: "
- << _canonicalQuery->toStringShort()
- << " planSummary: "
- << Explain::getPlanSummary(_root.get())
- << " status: "
- << statusObj;
-
- const bool shouldCache = false;
- return replan(yieldPolicy, shouldCache);
- }
- else if (PlanStage::DEAD == state) {
- BSONObj statusObj;
- WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj);
-
- LOG(1) << "Execution of cached plan failed: PlanStage died"
- << ", query: "
- << _canonicalQuery->toStringShort()
- << " planSummary: "
- << Explain::getPlanSummary(_root.get())
- << " status: "
- << statusObj;
-
- return WorkingSetCommon::getMemberObjectStatus(statusObj);
- }
- else {
- invariant(PlanStage::NEED_TIME == state);
- }
- }
- // If we're here, the trial period took more than 'maxWorksBeforeReplan' work cycles. This
- // plan is taking too long, so we replan from scratch.
- LOG(1) << "Execution of cached plan required "
- << maxWorksBeforeReplan
- << " works, but was originally cached with only "
- << _decisionWorks
- << " works. Evicting cache entry and replanning query: "
- << _canonicalQuery->toStringShort()
- << " plan summary before replan: "
- << Explain::getPlanSummary(_root.get());
-
- const bool shouldCache = true;
- return replan(yieldPolicy, shouldCache);
- }
+ if (yieldPolicy->allowedToYield()) {
+ yieldPolicy->forceYield();
+ }
- Status CachedPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) {
- // These are the conditions which can cause us to yield:
- // 1) The yield policy's timer elapsed, or
- // 2) some stage requested a yield due to a document fetch, or
- // 3) we need to yield and retry due to a WriteConflictException.
- // In all cases, the actual yielding happens here.
- if (yieldPolicy->shouldYield()) {
- // Here's where we yield.
- bool alive = yieldPolicy->yield(_fetcher.get());
-
- if (!alive) {
- return Status(ErrorCodes::OperationFailed,
- "CachedPlanStage killed during plan selection");
+ Status yieldStatus = tryYield(yieldPolicy);
+ if (!yieldStatus.isOK()) {
+ return yieldStatus;
}
+ } else if (PlanStage::FAILURE == state) {
+ // On failure, fall back to replanning the whole query. We neither evict the
+ // existing cache entry nor cache the result of replanning.
+ BSONObj statusObj;
+ WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj);
+
+ LOG(1) << "Execution of cached plan failed, falling back to replan."
+ << " query: " << _canonicalQuery->toStringShort()
+ << " planSummary: " << Explain::getPlanSummary(_root.get())
+ << " status: " << statusObj;
+
+ const bool shouldCache = false;
+ return replan(yieldPolicy, shouldCache);
+ } else if (PlanStage::DEAD == state) {
+ BSONObj statusObj;
+ WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj);
+
+ LOG(1) << "Execution of cached plan failed: PlanStage died"
+ << ", query: " << _canonicalQuery->toStringShort()
+ << " planSummary: " << Explain::getPlanSummary(_root.get())
+ << " status: " << statusObj;
+
+ return WorkingSetCommon::getMemberObjectStatus(statusObj);
+ } else {
+ invariant(PlanStage::NEED_TIME == state);
}
-
- // We're done using the fetcher, so it should be freed. We don't want to
- // use the same RecordFetcher twice.
- _fetcher.reset();
-
- return Status::OK();
}
- Status CachedPlanStage::replan(PlanYieldPolicy* yieldPolicy, bool shouldCache) {
- // We're going to start over with a new plan. No need for only old buffered results.
- _results.clear();
-
- // Clear out the working set. We'll start with a fresh working set.
- _ws->clear();
-
- // Use the query planning module to plan the whole query.
- std::vector<QuerySolution*> rawSolutions;
- Status status = QueryPlanner::plan(*_canonicalQuery, _plannerParams, &rawSolutions);
- if (!status.isOK()) {
- return Status(ErrorCodes::BadValue,
- str::stream()
- << "error processing query: " << _canonicalQuery->toString()
- << " planner returned error: " << status.reason());
+ // If we're here, the trial period took more than 'maxWorksBeforeReplan' work cycles. This
+ // plan is taking too long, so we replan from scratch.
+ LOG(1) << "Execution of cached plan required " << maxWorksBeforeReplan
+ << " works, but was originally cached with only " << _decisionWorks
+ << " works. Evicting cache entry and replanning query: "
+ << _canonicalQuery->toStringShort()
+ << " plan summary before replan: " << Explain::getPlanSummary(_root.get());
+
+ const bool shouldCache = true;
+ return replan(yieldPolicy, shouldCache);
+}
+
+Status CachedPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) {
+ // These are the conditions which can cause us to yield:
+ // 1) The yield policy's timer elapsed, or
+ // 2) some stage requested a yield due to a document fetch, or
+ // 3) we need to yield and retry due to a WriteConflictException.
+ // In all cases, the actual yielding happens here.
+ if (yieldPolicy->shouldYield()) {
+ // Here's where we yield.
+ bool alive = yieldPolicy->yield(_fetcher.get());
+
+ if (!alive) {
+ return Status(ErrorCodes::OperationFailed,
+ "CachedPlanStage killed during plan selection");
}
+ }
- OwnedPointerVector<QuerySolution> solutions(rawSolutions);
+ // We're done using the fetcher, so it should be freed. We don't want to
+ // use the same RecordFetcher twice.
+ _fetcher.reset();
- // We cannot figure out how to answer the query. Perhaps it requires an index
- // we do not have?
- if (0 == solutions.size()) {
- return Status(ErrorCodes::BadValue,
- str::stream()
- << "error processing query: "
- << _canonicalQuery->toString()
- << " No query solutions");
- }
+ return Status::OK();
+}
- if (1 == solutions.size()) {
- // If there's only one solution, it won't get cached. Make sure to evict the existing
- // cache entry if requested by the caller.
- if (shouldCache) {
- PlanCache* cache = _collection->infoCache()->getPlanCache();
- cache->remove(*_canonicalQuery);
- }
+Status CachedPlanStage::replan(PlanYieldPolicy* yieldPolicy, bool shouldCache) {
+ // We're going to start over with a new plan. No need for only old buffered results.
+ _results.clear();
- PlanStage* newRoot;
- // Only one possible plan. Build the stages from the solution.
- verify(StageBuilder::build(_txn, _collection, *solutions[0], _ws, &newRoot));
- _root.reset(newRoot);
- _replannedQs.reset(solutions.popAndReleaseBack());
- return Status::OK();
- }
+ // Clear out the working set. We'll start with a fresh working set.
+ _ws->clear();
- // Many solutions. Create a MultiPlanStage to pick the best, update the cache,
- // and so on. The working set will be shared by all candidate plans.
- _root.reset(new MultiPlanStage(_txn, _collection, _canonicalQuery, shouldCache));
- MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_root.get());
+ // Use the query planning module to plan the whole query.
+ std::vector<QuerySolution*> rawSolutions;
+ Status status = QueryPlanner::plan(*_canonicalQuery, _plannerParams, &rawSolutions);
+ if (!status.isOK()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "error processing query: " << _canonicalQuery->toString()
+ << " planner returned error: " << status.reason());
+ }
- for (size_t ix = 0; ix < solutions.size(); ++ix) {
- if (solutions[ix]->cacheData.get()) {
- solutions[ix]->cacheData->indexFilterApplied = _plannerParams.indexFiltersApplied;
- }
+ OwnedPointerVector<QuerySolution> solutions(rawSolutions);
- PlanStage* nextPlanRoot;
- verify(StageBuilder::build(_txn, _collection, *solutions[ix], _ws, &nextPlanRoot));
+ // We cannot figure out how to answer the query. Perhaps it requires an index
+ // we do not have?
+ if (0 == solutions.size()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "error processing query: " << _canonicalQuery->toString()
+ << " No query solutions");
+ }
- // Takes ownership of 'solutions[ix]' and 'nextPlanRoot'.
- multiPlanStage->addPlan(solutions.releaseAt(ix), nextPlanRoot, _ws);
+ if (1 == solutions.size()) {
+ // If there's only one solution, it won't get cached. Make sure to evict the existing
+ // cache entry if requested by the caller.
+ if (shouldCache) {
+ PlanCache* cache = _collection->infoCache()->getPlanCache();
+ cache->remove(*_canonicalQuery);
}
- // Delegate to the MultiPlanStage's plan selection facility.
- return multiPlanStage->pickBestPlan(yieldPolicy);
- }
-
- bool CachedPlanStage::isEOF() {
- return _results.empty() && _root->isEOF();
+ PlanStage* newRoot;
+ // Only one possible plan. Build the stages from the solution.
+ verify(StageBuilder::build(_txn, _collection, *solutions[0], _ws, &newRoot));
+ _root.reset(newRoot);
+ _replannedQs.reset(solutions.popAndReleaseBack());
+ return Status::OK();
}
- PlanStage::StageState CachedPlanStage::work(WorkingSetID* out) {
- ++_commonStats.works;
-
- // Adds the amount of time taken by work() to executionTimeMillis.
- ScopedTimer timer(&_commonStats.executionTimeMillis);
-
- if (isEOF()) { return PlanStage::IS_EOF; }
+ // Many solutions. Create a MultiPlanStage to pick the best, update the cache,
+ // and so on. The working set will be shared by all candidate plans.
+ _root.reset(new MultiPlanStage(_txn, _collection, _canonicalQuery, shouldCache));
+ MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_root.get());
- // First exhaust any results buffered during the trial period.
- if (!_results.empty()) {
- *out = _results.front();
- _results.pop_front();
- _commonStats.advanced++;
- return PlanStage::ADVANCED;
+ for (size_t ix = 0; ix < solutions.size(); ++ix) {
+ if (solutions[ix]->cacheData.get()) {
+ solutions[ix]->cacheData->indexFilterApplied = _plannerParams.indexFiltersApplied;
}
- // Nothing left in trial period buffer.
- StageState childStatus = _root->work(out);
+ PlanStage* nextPlanRoot;
+ verify(StageBuilder::build(_txn, _collection, *solutions[ix], _ws, &nextPlanRoot));
- if (PlanStage::ADVANCED == childStatus) {
- _commonStats.advanced++;
- }
- else if (PlanStage::NEED_YIELD == childStatus) {
- _commonStats.needYield++;
- }
- else if (PlanStage::NEED_TIME == childStatus) {
- _commonStats.needTime++;
- }
-
- return childStatus;
+ // Takes ownership of 'solutions[ix]' and 'nextPlanRoot'.
+ multiPlanStage->addPlan(solutions.releaseAt(ix), nextPlanRoot, _ws);
}
- void CachedPlanStage::saveState() {
- _txn = NULL;
- ++_commonStats.yields;
- _root->saveState();
- }
+ // Delegate to the MultiPlanStage's plan selection facility.
+ return multiPlanStage->pickBestPlan(yieldPolicy);
+}
- void CachedPlanStage::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
+bool CachedPlanStage::isEOF() {
+ return _results.empty() && _root->isEOF();
+}
- ++_commonStats.unyields;
- _root->restoreState(opCtx);
- }
+PlanStage::StageState CachedPlanStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
- void CachedPlanStage::invalidate(OperationContext* txn,
- const RecordId& dl,
- InvalidationType type) {
- _root->invalidate(txn, dl, type);
- ++_commonStats.invalidates;
-
- for (std::list<WorkingSetID>::iterator it = _results.begin(); it != _results.end(); ) {
- WorkingSetMember* member = _ws->get(*it);
- if (member->hasLoc() && member->loc == dl) {
- std::list<WorkingSetID>::iterator next = it;
- ++next;
- WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
- _results.erase(it);
- it = next;
- }
- else {
- ++it;
- }
- }
- }
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
- std::vector<PlanStage*> CachedPlanStage::getChildren() const {
- return { _root.get() };
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
}
- PlanStageStats* CachedPlanStage::getStats() {
- _commonStats.isEOF = isEOF();
-
- std::unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_CACHED_PLAN));
- ret->specific.reset(new CachedPlanStats(_specificStats));
- ret->children.push_back(_root->getStats());
-
- return ret.release();
+ // First exhaust any results buffered during the trial period.
+ if (!_results.empty()) {
+ *out = _results.front();
+ _results.pop_front();
+ _commonStats.advanced++;
+ return PlanStage::ADVANCED;
}
- const CommonStats* CachedPlanStage::getCommonStats() const {
- return &_commonStats;
- }
+ // Nothing left in trial period buffer.
+ StageState childStatus = _root->work(out);
- const SpecificStats* CachedPlanStage::getSpecificStats() const {
- return &_specificStats;
+ if (PlanStage::ADVANCED == childStatus) {
+ _commonStats.advanced++;
+ } else if (PlanStage::NEED_YIELD == childStatus) {
+ _commonStats.needYield++;
+ } else if (PlanStage::NEED_TIME == childStatus) {
+ _commonStats.needTime++;
}
- void CachedPlanStage::updatePlanCache() {
- std::unique_ptr<PlanCacheEntryFeedback> feedback(new PlanCacheEntryFeedback());
- feedback->stats.reset(getStats());
- feedback->score = PlanRanker::scoreTree(feedback->stats.get());
-
- PlanCache* cache = _collection->infoCache()->getPlanCache();
- Status fbs = cache->feedback(*_canonicalQuery, feedback.release());
- if (!fbs.isOK()) {
- LOG(5) << _canonicalQuery->ns() << ": Failed to update cache with feedback: "
- << fbs.toString() << " - "
- << "(query: " << _canonicalQuery->getQueryObj()
- << "; sort: " << _canonicalQuery->getParsed().getSort()
- << "; projection: " << _canonicalQuery->getParsed().getProj()
- << ") is no longer in plan cache.";
+ return childStatus;
+}
+
+void CachedPlanStage::saveState() {
+ _txn = NULL;
+ ++_commonStats.yields;
+ _root->saveState();
+}
+
+void CachedPlanStage::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+
+ ++_commonStats.unyields;
+ _root->restoreState(opCtx);
+}
+
+void CachedPlanStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ _root->invalidate(txn, dl, type);
+ ++_commonStats.invalidates;
+
+ for (std::list<WorkingSetID>::iterator it = _results.begin(); it != _results.end();) {
+ WorkingSetMember* member = _ws->get(*it);
+ if (member->hasLoc() && member->loc == dl) {
+ std::list<WorkingSetID>::iterator next = it;
+ ++next;
+ WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
+ _results.erase(it);
+ it = next;
+ } else {
+ ++it;
}
}
+}
+
+std::vector<PlanStage*> CachedPlanStage::getChildren() const {
+ return {_root.get()};
+}
+
+PlanStageStats* CachedPlanStage::getStats() {
+ _commonStats.isEOF = isEOF();
+
+ std::unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_CACHED_PLAN));
+ ret->specific.reset(new CachedPlanStats(_specificStats));
+ ret->children.push_back(_root->getStats());
+
+ return ret.release();
+}
+
+const CommonStats* CachedPlanStage::getCommonStats() const {
+ return &_commonStats;
+}
+
+const SpecificStats* CachedPlanStage::getSpecificStats() const {
+ return &_specificStats;
+}
+
+void CachedPlanStage::updatePlanCache() {
+ std::unique_ptr<PlanCacheEntryFeedback> feedback(new PlanCacheEntryFeedback());
+ feedback->stats.reset(getStats());
+ feedback->score = PlanRanker::scoreTree(feedback->stats.get());
+
+ PlanCache* cache = _collection->infoCache()->getPlanCache();
+ Status fbs = cache->feedback(*_canonicalQuery, feedback.release());
+ if (!fbs.isOK()) {
+ LOG(5) << _canonicalQuery->ns()
+ << ": Failed to update cache with feedback: " << fbs.toString() << " - "
+ << "(query: " << _canonicalQuery->getQueryObj()
+ << "; sort: " << _canonicalQuery->getParsed().getSort()
+ << "; projection: " << _canonicalQuery->getParsed().getProj()
+ << ") is no longer in plan cache.";
+ }
+}
} // namespace mongo