/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include "mongo/db/exec/cached_plan.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/plan_cache.h"
#include "mongo/db/query/plan_ranker.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/stage_builder.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
// static
const char* CachedPlanStage::kStageType = "CACHED_PLAN";
CachedPlanStage::CachedPlanStage(OperationContext* txn,
Collection* collection,
WorkingSet* ws,
CanonicalQuery* cq,
const QueryPlannerParams& params,
size_t decisionWorks,
PlanStage* root)
: PlanStage(kStageType, txn),
_collection(collection),
_ws(ws),
_canonicalQuery(cq),
_plannerParams(params),
_decisionWorks(decisionWorks) {
invariant(_collection);
_children.emplace_back(root);
}
Status CachedPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
// Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
// execution work that happens here, so this is needed for the time accounting to
// make sense.
ScopedTimer timer(getClock(), &_commonStats.executionTimeMillis);
// If we work this many times during the trial period, then we will replan the
// query from scratch.
size_t maxWorksBeforeReplan =
static_cast(internalQueryCacheEvictionRatio * _decisionWorks);
// The trial period ends without replanning if the cached plan produces this many results.
size_t numResults = MultiPlanStage::getTrialPeriodNumToReturn(*_canonicalQuery);
for (size_t i = 0; i < maxWorksBeforeReplan; ++i) {
// Might need to yield between calls to work due to the timer elapsing.
Status yieldStatus = tryYield(yieldPolicy);
if (!yieldStatus.isOK()) {
return yieldStatus;
}
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState state = child()->work(&id);
if (PlanStage::ADVANCED == state) {
// Save result for later.
WorkingSetMember* member = _ws->get(id);
// Ensure that the BSONObj underlying the WorkingSetMember is owned in case we yield.
member->makeObjOwnedIfNeeded();
_results.push_back(id);
if (_results.size() >= numResults) {
// Once a plan returns enough results, stop working. Update cache with stats
// from this run and return.
updatePlanCache();
return Status::OK();
}
} else if (PlanStage::IS_EOF == state) {
// Cached plan hit EOF quickly enough. No need to replan. Update cache with stats
// from this run and return.
updatePlanCache();
return Status::OK();
} else if (PlanStage::NEED_YIELD == state) {
if (id == WorkingSet::INVALID_ID) {
if (!yieldPolicy->allowedToYield()) {
throw WriteConflictException();
}
} else {
WorkingSetMember* member = _ws->get(id);
invariant(member->hasFetcher());
// Transfer ownership of the fetcher and yield.
_fetcher.reset(member->releaseFetcher());
}
if (yieldPolicy->allowedToYield()) {
yieldPolicy->forceYield();
}
Status yieldStatus = tryYield(yieldPolicy);
if (!yieldStatus.isOK()) {
return yieldStatus;
}
} else if (PlanStage::FAILURE == state) {
// On failure, fall back to replanning the whole query. We neither evict the
// existing cache entry nor cache the result of replanning.
BSONObj statusObj;
WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj);
LOG(1) << "Execution of cached plan failed, falling back to replan."
<< " query: " << redact(_canonicalQuery->toStringShort())
<< " planSummary: " << redact(Explain::getPlanSummary(child().get()))
<< " status: " << redact(statusObj);
const bool shouldCache = false;
return replan(yieldPolicy, shouldCache);
} else if (PlanStage::DEAD == state) {
BSONObj statusObj;
WorkingSetCommon::getStatusMemberObject(*_ws, id, &statusObj);
LOG(1) << "Execution of cached plan failed: PlanStage died"
<< ", query: " << redact(_canonicalQuery->toStringShort())
<< " planSummary: " << redact(Explain::getPlanSummary(child().get()))
<< " status: " << redact(statusObj);
return WorkingSetCommon::getMemberObjectStatus(statusObj);
} else {
invariant(PlanStage::NEED_TIME == state);
}
}
// If we're here, the trial period took more than 'maxWorksBeforeReplan' work cycles. This
// plan is taking too long, so we replan from scratch.
LOG(1) << "Execution of cached plan required " << maxWorksBeforeReplan
<< " works, but was originally cached with only " << _decisionWorks
<< " works. Evicting cache entry and replanning query: "
<< redact(_canonicalQuery->toStringShort())
<< " plan summary before replan: " << redact(Explain::getPlanSummary(child().get()));
const bool shouldCache = true;
return replan(yieldPolicy, shouldCache);
}
Status CachedPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) {
// These are the conditions which can cause us to yield:
// 1) The yield policy's timer elapsed, or
// 2) some stage requested a yield due to a document fetch, or
// 3) we need to yield and retry due to a WriteConflictException.
// In all cases, the actual yielding happens here.
if (yieldPolicy->shouldYield()) {
// Here's where we yield.
bool alive = yieldPolicy->yield(_fetcher.get());
if (!alive) {
return Status(ErrorCodes::QueryPlanKilled,
"CachedPlanStage killed during plan selection");
}
}
// We're done using the fetcher, so it should be freed. We don't want to
// use the same RecordFetcher twice.
_fetcher.reset();
return Status::OK();
}
Status CachedPlanStage::replan(PlanYieldPolicy* yieldPolicy, bool shouldCache) {
// We're going to start over with a new plan. Clear out info from our old plan.
_results.clear();
_ws->clear();
_children.clear();
_specificStats.replanned = true;
// Use the query planning module to plan the whole query.
std::vector rawSolutions;
Status status = QueryPlanner::plan(*_canonicalQuery, _plannerParams, &rawSolutions);
if (!status.isOK()) {
return Status(ErrorCodes::BadValue,
str::stream() << "error processing query: " << _canonicalQuery->toString()
<< " planner returned error: "
<< status.reason());
}
OwnedPointerVector solutions(rawSolutions);
// We cannot figure out how to answer the query. Perhaps it requires an index
// we do not have?
if (0 == solutions.size()) {
return Status(ErrorCodes::BadValue,
str::stream() << "error processing query: " << _canonicalQuery->toString()
<< " No query solutions");
}
if (1 == solutions.size()) {
// If there's only one solution, it won't get cached. Make sure to evict the existing
// cache entry if requested by the caller.
if (shouldCache) {
PlanCache* cache = _collection->infoCache()->getPlanCache();
cache->remove(*_canonicalQuery);
}
PlanStage* newRoot;
// Only one possible plan. Build the stages from the solution.
verify(StageBuilder::build(
getOpCtx(), _collection, *_canonicalQuery, *solutions[0], _ws, &newRoot));
_children.emplace_back(newRoot);
_replannedQs.reset(solutions.popAndReleaseBack());
LOG(1)
<< "Replanning of query resulted in single query solution, which will not be cached. "
<< redact(_canonicalQuery->toStringShort())
<< " plan summary after replan: " << redact(Explain::getPlanSummary(child().get()))
<< " previous cache entry evicted: " << (shouldCache ? "yes" : "no");
return Status::OK();
}
// Many solutions. Create a MultiPlanStage to pick the best, update the cache,
// and so on. The working set will be shared by all candidate plans.
auto cachingMode = shouldCache ? MultiPlanStage::CachingMode::AlwaysCache
: MultiPlanStage::CachingMode::NeverCache;
_children.emplace_back(
new MultiPlanStage(getOpCtx(), _collection, _canonicalQuery, cachingMode));
MultiPlanStage* multiPlanStage = static_cast(child().get());
for (size_t ix = 0; ix < solutions.size(); ++ix) {
if (solutions[ix]->cacheData.get()) {
solutions[ix]->cacheData->indexFilterApplied = _plannerParams.indexFiltersApplied;
}
PlanStage* nextPlanRoot;
verify(StageBuilder::build(
getOpCtx(), _collection, *_canonicalQuery, *solutions[ix], _ws, &nextPlanRoot));
// Takes ownership of 'solutions[ix]' and 'nextPlanRoot'.
multiPlanStage->addPlan(solutions.releaseAt(ix), nextPlanRoot, _ws);
}
// Delegate to the MultiPlanStage's plan selection facility.
Status pickBestPlanStatus = multiPlanStage->pickBestPlan(yieldPolicy);
if (!pickBestPlanStatus.isOK()) {
return pickBestPlanStatus;
}
LOG(1) << "Replanning " << redact(_canonicalQuery->toStringShort())
<< " resulted in plan with summary: " << redact(Explain::getPlanSummary(child().get()))
<< ", which " << (shouldCache ? "has" : "has not") << " been written to the cache";
return Status::OK();
}
bool CachedPlanStage::isEOF() {
return _results.empty() && child()->isEOF();
}
PlanStage::StageState CachedPlanStage::doWork(WorkingSetID* out) {
if (isEOF()) {
return PlanStage::IS_EOF;
}
// First exhaust any results buffered during the trial period.
if (!_results.empty()) {
*out = _results.front();
_results.pop_front();
return PlanStage::ADVANCED;
}
// Nothing left in trial period buffer.
return child()->work(out);
}
void CachedPlanStage::doInvalidate(OperationContext* txn,
const RecordId& dl,
InvalidationType type) {
for (auto it = _results.begin(); it != _results.end(); ++it) {
WorkingSetMember* member = _ws->get(*it);
if (member->hasRecordId() && member->recordId == dl) {
WorkingSetCommon::fetchAndInvalidateRecordId(txn, member, _collection);
}
}
}
std::unique_ptr CachedPlanStage::getStats() {
_commonStats.isEOF = isEOF();
std::unique_ptr ret =
stdx::make_unique(_commonStats, STAGE_CACHED_PLAN);
ret->specific = stdx::make_unique(_specificStats);
ret->children.emplace_back(child()->getStats());
return ret;
}
const SpecificStats* CachedPlanStage::getSpecificStats() const {
return &_specificStats;
}
void CachedPlanStage::updatePlanCache() {
std::unique_ptr feedback = stdx::make_unique();
feedback->stats = getStats();
feedback->score = PlanRanker::scoreTree(feedback->stats->children[0].get());
PlanCache* cache = _collection->infoCache()->getPlanCache();
Status fbs = cache->feedback(*_canonicalQuery, feedback.release());
if (!fbs.isOK()) {
LOG(5) << _canonicalQuery->ns() << ": Failed to update cache with feedback: " << redact(fbs)
<< " - "
<< "(query: " << redact(_canonicalQuery->getQueryObj())
<< "; sort: " << _canonicalQuery->getQueryRequest().getSort()
<< "; projection: " << _canonicalQuery->getQueryRequest().getProj()
<< ") is no longer in plan cache.";
}
}
} // namespace mongo