/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include "mongo/db/exec/multi_plan.h"
#include
#include
#include "mongo/base/owned_pointer_vector.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/plan_cache.h"
#include "mongo/db/query/plan_ranker.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using std::endl;
using std::list;
using std::unique_ptr;
using std::vector;
using stdx::make_unique;
// static
const char* MultiPlanStage::kStageType = "MULTI_PLAN";
MultiPlanStage::MultiPlanStage(OperationContext* txn,
const Collection* collection,
CanonicalQuery* cq,
CachingMode cachingMode)
: PlanStage(kStageType, txn),
_collection(collection),
_cachingMode(cachingMode),
_query(cq),
_bestPlanIdx(kNoSuchPlan),
_backupPlanIdx(kNoSuchPlan),
_failure(false),
_failureCount(0),
_statusMemberId(WorkingSet::INVALID_ID) {
invariant(_collection);
}
void MultiPlanStage::addPlan(QuerySolution* solution, PlanStage* root, WorkingSet* ws) {
_candidates.push_back(CandidatePlan(solution, root, ws));
_children.emplace_back(root);
}
bool MultiPlanStage::isEOF() {
if (_failure) {
return true;
}
// If _bestPlanIdx hasn't been found, can't be at EOF
if (!bestPlanChosen()) {
return false;
}
// We must have returned all our cached results
// and there must be no more results from the best plan.
CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
return bestPlan.results.empty() && bestPlan.root->isEOF();
}
PlanStage::StageState MultiPlanStage::doWork(WorkingSetID* out) {
if (_failure) {
*out = _statusMemberId;
return PlanStage::FAILURE;
}
CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
// Look for an already produced result that provides the data the caller wants.
if (!bestPlan.results.empty()) {
*out = bestPlan.results.front();
bestPlan.results.pop_front();
return PlanStage::ADVANCED;
}
// best plan had no (or has no more) cached results
StageState state = bestPlan.root->work(out);
if (PlanStage::FAILURE == state && hasBackupPlan()) {
LOG(5) << "Best plan errored out switching to backup";
// Uncache the bad solution if we fall back
// on the backup solution.
//
// XXX: Instead of uncaching we should find a way for the
// cached plan runner to fall back on a different solution
// if the best solution fails. Alternatively we could try to
// defer cache insertion to be after the first produced result.
_collection->infoCache()->getPlanCache()->remove(*_query);
_bestPlanIdx = _backupPlanIdx;
_backupPlanIdx = kNoSuchPlan;
return _candidates[_bestPlanIdx].root->work(out);
}
if (hasBackupPlan() && PlanStage::ADVANCED == state) {
LOG(5) << "Best plan had a blocking stage, became unblocked";
_backupPlanIdx = kNoSuchPlan;
}
return state;
}
Status MultiPlanStage::tryYield(PlanYieldPolicy* yieldPolicy) {
// These are the conditions which can cause us to yield:
// 1) The yield policy's timer elapsed, or
// 2) some stage requested a yield due to a document fetch, or
// 3) we need to yield and retry due to a WriteConflictException.
// In all cases, the actual yielding happens here.
if (yieldPolicy->shouldYield()) {
bool alive = yieldPolicy->yield(_fetcher.get());
if (!alive) {
_failure = true;
Status failStat(ErrorCodes::QueryPlanKilled,
"PlanExecutor killed during plan selection");
_statusMemberId = WorkingSetCommon::allocateStatusMember(_candidates[0].ws, failStat);
return failStat;
}
}
// We're done using the fetcher, so it should be freed. We don't want to
// use the same RecordFetcher twice.
_fetcher.reset();
return Status::OK();
}
// static
size_t MultiPlanStage::getTrialPeriodWorks(OperationContext* txn, const Collection* collection) {
// Run each plan some number of times. This number is at least as great as
// 'internalQueryPlanEvaluationWorks', but may be larger for big collections.
size_t numWorks = internalQueryPlanEvaluationWorks.load();
if (NULL != collection) {
// For large collections, the number of works is set to be this
// fraction of the collection size.
double fraction = internalQueryPlanEvaluationCollFraction;
numWorks = std::max(static_cast(internalQueryPlanEvaluationWorks.load()),
static_cast(fraction * collection->numRecords(txn)));
}
return numWorks;
}
// static
size_t MultiPlanStage::getTrialPeriodNumToReturn(const CanonicalQuery& query) {
// Determine the number of results which we will produce during the plan
// ranking phase before stopping.
size_t numResults = static_cast(internalQueryPlanEvaluationMaxResults.load());
if (query.getQueryRequest().getNToReturn()) {
numResults =
std::min(static_cast(*query.getQueryRequest().getNToReturn()), numResults);
} else if (query.getQueryRequest().getLimit()) {
numResults = std::min(static_cast(*query.getQueryRequest().getLimit()), numResults);
}
return numResults;
}
Status MultiPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
// Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
// execution work that happens here, so this is needed for the time accounting to
// make sense.
ScopedTimer timer(getClock(), &_commonStats.executionTimeMillis);
size_t numWorks = getTrialPeriodWorks(getOpCtx(), _collection);
size_t numResults = getTrialPeriodNumToReturn(*_query);
// Work the plans, stopping when a plan hits EOF or returns some
// fixed number of results.
for (size_t ix = 0; ix < numWorks; ++ix) {
bool moreToDo = workAllPlans(numResults, yieldPolicy);
if (!moreToDo) {
break;
}
}
if (_failure) {
invariant(WorkingSet::INVALID_ID != _statusMemberId);
WorkingSetMember* member = _candidates[0].ws->get(_statusMemberId);
return WorkingSetCommon::getMemberStatus(*member);
}
// After picking best plan, ranking will own plan stats from
// candidate solutions (winner and losers).
std::unique_ptr ranking(new PlanRankingDecision);
_bestPlanIdx = PlanRanker::pickBestPlan(_candidates, ranking.get());
verify(_bestPlanIdx >= 0 && _bestPlanIdx < static_cast(_candidates.size()));
// Copy candidate order. We will need this to sort candidate stats for explain
// after transferring ownership of 'ranking' to plan cache.
std::vector candidateOrder = ranking->candidateOrder;
CandidatePlan& bestCandidate = _candidates[_bestPlanIdx];
std::list& alreadyProduced = bestCandidate.results;
const auto& bestSolution = bestCandidate.solution;
LOG(5) << "Winning solution:\n" << redact(bestSolution->toString());
LOG(2) << "Winning plan: " << redact(Explain::getPlanSummary(bestCandidate.root));
_backupPlanIdx = kNoSuchPlan;
if (bestSolution->hasBlockingStage && (0 == alreadyProduced.size())) {
LOG(5) << "Winner has blocking stage, looking for backup plan...";
for (size_t ix = 0; ix < _candidates.size(); ++ix) {
if (!_candidates[ix].solution->hasBlockingStage) {
LOG(5) << "Candidate " << ix << " is backup child";
_backupPlanIdx = ix;
break;
}
}
}
// Even if the query is of a cacheable shape, the caller might have indicated that we shouldn't
// write to the plan cache.
//
// TODO: We can remove this if we introduce replanning logic to the SubplanStage.
bool canCache = (_cachingMode == CachingMode::AlwaysCache);
if (_cachingMode == CachingMode::SometimesCache) {
// In "sometimes cache" mode, we cache unless we hit one of the special cases below.
canCache = true;
if (ranking->tieForBest) {
// The winning plan tied with the runner-up and we're using "sometimes cache" mode. We
// will not write a plan cache entry.
canCache = false;
// These arrays having two or more entries is implied by 'tieForBest'.
invariant(ranking->scores.size() > 1U);
invariant(ranking->candidateOrder.size() > 1U);
size_t winnerIdx = ranking->candidateOrder[0];
size_t runnerUpIdx = ranking->candidateOrder[1];
LOG(1) << "Winning plan tied with runner-up. Not caching."
<< " ns: " << _collection->ns() << " " << redact(_query->toStringShort())
<< " winner score: " << ranking->scores[0] << " winner summary: "
<< redact(Explain::getPlanSummary(_candidates[winnerIdx].root))
<< " runner-up score: " << ranking->scores[1] << " runner-up summary: "
<< redact(Explain::getPlanSummary(_candidates[runnerUpIdx].root));
}
if (alreadyProduced.empty()) {
// We're using the "sometimes cache" mode, and the winning plan produced no results
// during the plan ranking trial period. We will not write a plan cache entry.
canCache = false;
size_t winnerIdx = ranking->candidateOrder[0];
LOG(1) << "Winning plan had zero results. Not caching."
<< " ns: " << _collection->ns() << " " << redact(_query->toStringShort())
<< " winner score: " << ranking->scores[0] << " winner summary: "
<< redact(Explain::getPlanSummary(_candidates[winnerIdx].root));
}
}
// Store the choice we just made in the cache, if the query is of a type that is safe to
// cache.
if (PlanCache::shouldCacheQuery(*_query) && canCache) {
// Create list of candidate solutions for the cache with
// the best solution at the front.
std::vector solutions;
// Generate solutions and ranking decisions sorted by score.
for (size_t orderingIndex = 0; orderingIndex < candidateOrder.size(); ++orderingIndex) {
// index into candidates/ranking
size_t ix = candidateOrder[orderingIndex];
solutions.push_back(_candidates[ix].solution.get());
}
// Check solution cache data. Do not add to cache if
// we have any invalid SolutionCacheData data.
// XXX: One known example is 2D queries
bool validSolutions = true;
for (size_t ix = 0; ix < solutions.size(); ++ix) {
if (NULL == solutions[ix]->cacheData.get()) {
LOG(5) << "Not caching query because this solution has no cache data: "
<< redact(solutions[ix]->toString());
validSolutions = false;
break;
}
}
if (validSolutions) {
_collection->infoCache()->getPlanCache()->add(*_query, solutions, ranking.release());
}
}
return Status::OK();
}
bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolicy) {
bool doneWorking = false;
for (size_t ix = 0; ix < _candidates.size(); ++ix) {
CandidatePlan& candidate = _candidates[ix];
if (candidate.failed) {
continue;
}
// Might need to yield between calls to work due to the timer elapsing.
if (!(tryYield(yieldPolicy)).isOK()) {
return false;
}
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState state = candidate.root->work(&id);
if (PlanStage::ADVANCED == state) {
// Save result for later.
WorkingSetMember* member = candidate.ws->get(id);
// Ensure that the BSONObj underlying the WorkingSetMember is owned in case we choose to
// return the results from the 'candidate' plan.
member->makeObjOwnedIfNeeded();
candidate.results.push_back(id);
// Once a plan returns enough results, stop working.
if (candidate.results.size() >= numResults) {
doneWorking = true;
}
} else if (PlanStage::IS_EOF == state) {
// First plan to hit EOF wins automatically. Stop evaluating other plans.
// Assumes that the ranking will pick this plan.
doneWorking = true;
} else if (PlanStage::NEED_YIELD == state) {
if (id == WorkingSet::INVALID_ID) {
if (!yieldPolicy->allowedToYield())
throw WriteConflictException();
} else {
WorkingSetMember* member = candidate.ws->get(id);
invariant(member->hasFetcher());
// Transfer ownership of the fetcher and yield.
_fetcher.reset(member->releaseFetcher());
}
if (yieldPolicy->allowedToYield()) {
yieldPolicy->forceYield();
}
if (!(tryYield(yieldPolicy)).isOK()) {
return false;
}
} else if (PlanStage::NEED_TIME != state) {
// FAILURE or DEAD. Do we want to just tank that plan and try the rest? We
// probably want to fail globally as this shouldn't happen anyway.
candidate.failed = true;
++_failureCount;
// Propagate most recent seen failure to parent.
if (PlanStage::FAILURE == state) {
_statusMemberId = id;
}
if (_failureCount == _candidates.size()) {
_failure = true;
return false;
}
}
}
return !doneWorking;
}
namespace {
void invalidateHelper(OperationContext* txn,
WorkingSet* ws, // may flag for review
const RecordId& recordId,
list* idsToInvalidate,
const Collection* collection) {
for (auto it = idsToInvalidate->begin(); it != idsToInvalidate->end(); ++it) {
WorkingSetMember* member = ws->get(*it);
if (member->hasRecordId() && member->recordId == recordId) {
WorkingSetCommon::fetchAndInvalidateRecordId(txn, member, collection);
}
}
}
} // namespace
void MultiPlanStage::doInvalidate(OperationContext* txn,
const RecordId& recordId,
InvalidationType type) {
if (_failure) {
return;
}
if (bestPlanChosen()) {
CandidatePlan& bestPlan = _candidates[_bestPlanIdx];
invalidateHelper(txn, bestPlan.ws, recordId, &bestPlan.results, _collection);
if (hasBackupPlan()) {
CandidatePlan& backupPlan = _candidates[_backupPlanIdx];
invalidateHelper(txn, backupPlan.ws, recordId, &backupPlan.results, _collection);
}
} else {
for (size_t ix = 0; ix < _candidates.size(); ++ix) {
invalidateHelper(
txn, _candidates[ix].ws, recordId, &_candidates[ix].results, _collection);
}
}
}
bool MultiPlanStage::hasBackupPlan() const {
return kNoSuchPlan != _backupPlanIdx;
}
bool MultiPlanStage::bestPlanChosen() const {
return kNoSuchPlan != _bestPlanIdx;
}
int MultiPlanStage::bestPlanIdx() const {
return _bestPlanIdx;
}
QuerySolution* MultiPlanStage::bestSolution() {
if (_bestPlanIdx == kNoSuchPlan)
return NULL;
return _candidates[_bestPlanIdx].solution.get();
}
unique_ptr MultiPlanStage::getStats() {
_commonStats.isEOF = isEOF();
unique_ptr ret = make_unique(_commonStats, STAGE_MULTI_PLAN);
ret->specific = make_unique(_specificStats);
for (auto&& child : _children) {
ret->children.emplace_back(child->getStats());
}
return ret;
}
const SpecificStats* MultiPlanStage::getSpecificStats() const {
return &_specificStats;
}
} // namespace mongo