/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/query/subplan_runner.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/diskloc.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/get_runner.h" #include "mongo/db/query/multi_plan_runner.h" #include "mongo/db/query/planner_analysis.h" #include "mongo/db/query/planner_access.h" #include "mongo/db/query/qlog.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/stage_builder.h" #include "mongo/db/query/type_explain.h" namespace mongo { // static bool SubplanRunner::canUseSubplanRunner(const CanonicalQuery& query) { const LiteParsedQuery& lpq = query.getParsed(); const MatchExpression* expr = query.root(); // Only rooted ORs work with the subplan scheme. if (MatchExpression::OR != expr->matchType()) { return false; } // Collection scan // No sort order requested if (lpq.getSort().isEmpty() && expr->matchType() == MatchExpression::AND && expr->numChildren() == 0) { return false; } // Hint provided if (!lpq.getHint().isEmpty()) { return false; } // Min provided // Min queries are a special case of hinted queries. if (!lpq.getMin().isEmpty()) { return false; } // Max provided // Similar to min, max queries are a special case of hinted queries. if (!lpq.getMax().isEmpty()) { return false; } // Tailable cursors won't get cached, just turn into collscans. if (query.getParsed().hasOption(QueryOption_CursorTailable)) { return false; } // Snapshot is really a hint. if (query.getParsed().isSnapshot()) { return false; } return true; } // static Status SubplanRunner::make(Collection* collection, const QueryPlannerParams& params, CanonicalQuery* cq, SubplanRunner** out) { auto_ptr autoRunner(new SubplanRunner(collection, params, cq)); Status planningStatus = autoRunner->planSubqueries(); if (!planningStatus.isOK()) { return planningStatus; } *out = autoRunner.release(); return Status::OK(); } SubplanRunner::SubplanRunner(Collection* collection, const QueryPlannerParams& params, CanonicalQuery* cq) : _state(SubplanRunner::PLANNING), _collection(collection), _plannerParams(params), _query(cq), _killed(false), _policy(Runner::YIELD_MANUAL), _ns(cq->getParsed().ns()) { } SubplanRunner::~SubplanRunner() { while (!_solutions.empty()) { vector solns = _solutions.front(); for (size_t i = 0; i < solns.size(); i++) { delete solns[i]; } _solutions.pop(); } while (!_cqs.empty()) { delete _cqs.front(); _cqs.pop(); } } Runner::RunnerState SubplanRunner::getNext(BSONObj* objOut, DiskLoc* dlOut) { if (_killed) { return Runner::RUNNER_DEAD; } if (isEOF()) { return Runner::RUNNER_EOF; } if (SubplanRunner::PLANNING == _state) { // Try to run as sub-plans. if (runSubplans()) { // If runSubplans returns true we expect something here. invariant(_underlyingRunner.get()); } else if (!_killed) { // Couldn't run as subplans so we'll just call normal getRunner. Runner* runner; Status status = getRunnerAlwaysPlan( _collection, _query.release(), _plannerParams, &runner); if (!status.isOK()) { // We utterly failed. _killed = true; // Propagate the error to the user wrapped in a BSONObj if (NULL != objOut) { BSONObjBuilder bob; bob.append("ok", status.isOK() ? 1.0 : 0.0); bob.append("code", status.code()); bob.append("errmsg", status.reason()); *objOut = bob.obj(); } return Runner::RUNNER_ERROR; } else { _underlyingRunner.reset(runner); _underlyingRunner->setYieldPolicy(_policy); } } // We can change state when we're either killed or we have an underlying runner. invariant(_killed || NULL != _underlyingRunner.get()); _state = SubplanRunner::RUNNING; } if (_killed) { return Runner::RUNNER_DEAD; } if (isEOF()) { return Runner::RUNNER_EOF; } // If we're here we should have planned already. invariant(SubplanRunner::RUNNING == _state); invariant(_underlyingRunner.get()); return _underlyingRunner->getNext(objOut, dlOut); } Status SubplanRunner::planSubqueries() { MatchExpression* theOr = _query->root(); for (size_t i = 0; i < _plannerParams.indices.size(); ++i) { const IndexEntry& ie = _plannerParams.indices[i]; _indexMap[ie.keyPattern] = i; QLOG() << "Subplanner: index " << i << " is " << ie.toString() << endl; } for (size_t i = 0; i < theOr->numChildren(); ++i) { // Turn the i-th child into its own query. MatchExpression* orChild = theOr->getChild(i); CanonicalQuery* orChildCQ; Status childCQStatus = CanonicalQuery::canonicalize(*_query, orChild, &orChildCQ); if (!childCQStatus.isOK()) { mongoutils::str::stream ss; ss << "Subplanner: Can't canonicalize subchild " << orChild->toString() << " " << childCQStatus.reason(); return Status(ErrorCodes::BadValue, ss); } // Make sure it gets cleaned up. auto_ptr safeOrChildCQ(orChildCQ); // Plan the i-th child. vector solutions; // We don't set NO_TABLE_SCAN because peeking at the cache data will keep us from // considering any plan that's a collscan. QLOG() << "Subplanner: planning child " << i << " of " << theOr->numChildren(); Status status = QueryPlanner::plan(*safeOrChildCQ, _plannerParams, &solutions); if (!status.isOK()) { mongoutils::str::stream ss; ss << "Subplanner: Can't plan for subchild " << orChildCQ->toString() << " " << status.reason(); return Status(ErrorCodes::BadValue, ss); } QLOG() << "Subplanner: got " << solutions.size() << " solutions"; if (0 == solutions.size()) { // If one child doesn't have an indexed solution, bail out. mongoutils::str::stream ss; ss << "Subplanner: No solutions for subchild " << orChildCQ->toString(); return Status(ErrorCodes::BadValue, ss); } // Hang onto the canonicalized subqueries and the corresponding query solutions // so that they can be used in subplan running later on. _cqs.push(safeOrChildCQ.release()); _solutions.push(solutions); } return Status::OK(); } bool SubplanRunner::runSubplans() { // This is what we annotate with the index selections and then turn into a solution. auto_ptr theOr( static_cast(_query->root()->shallowClone())); // This is the skeleton of index selections that is inserted into the cache. auto_ptr cacheData(new PlanCacheIndexTree()); for (size_t i = 0; i < theOr->numChildren(); ++i) { MatchExpression* orChild = theOr->getChild(i); auto_ptr orChildCQ(_cqs.front()); _cqs.pop(); // 'solutions' is owned by the SubplanRunner instance until // it is popped from the queue. vector solutions = _solutions.front(); // We already checked for zero solutions in planSubqueries(...). invariant(!solutions.empty()); if (1 == solutions.size()) { // There is only one solution. Transfer ownership to an auto_ptr. _solutions.pop(); auto_ptr autoSoln(solutions[0]); // We want a well-formed *indexed* solution. if (NULL == autoSoln->cacheData.get()) { // For example, we don't cache things for 2d indices. QLOG() << "Subplanner: No cache data for subchild " << orChildCQ->toString(); return false; } if (SolutionCacheData::USE_INDEX_TAGS_SOLN != autoSoln->cacheData->solnType) { QLOG() << "Subplanner: No indexed cache data for subchild " << orChildCQ->toString(); return false; } // Add the index assignments to our original query. Status tagStatus = QueryPlanner::tagAccordingToCache( orChild, autoSoln->cacheData->tree.get(), _indexMap); if (!tagStatus.isOK()) { QLOG() << "Subplanner: Failed to extract indices from subchild" << orChildCQ->toString(); return false; } // Add the child's cache data to the cache data we're creating for the main query. cacheData->children.push_back(autoSoln->cacheData->tree->clone()); } else { // N solutions, rank them. Takes ownership of safeOrChildCQ. MultiPlanRunner* mpr = new MultiPlanRunner(_collection, orChildCQ.release()); // Dump all the solutions into the MPR. The MPR takes ownership of // each solution. _solutions.pop(); for (size_t i = 0; i < solutions.size(); ++i) { WorkingSet* ws; PlanStage* root; verify(StageBuilder::build(*solutions[i], &root, &ws)); // Takes ownership of all arguments. mpr->addPlan(solutions[i], root, ws); } // If we're allowed to yield, let the MPR know. mpr->setYieldPolicy(_policy); // Calling pickBestPlan can yield so we must propagate events down to the MPR. _underlyingRunner.reset(mpr); // Pull out the best plan. size_t bestPlan; BSONObj errorObj; if (!mpr->pickBestPlan(&bestPlan, &errorObj)) { QLOG() << "Subplanner: Failed to pick best plan for subchild " << orChildCQ->toString() << " error obj is " << errorObj.toString(); return false; } // pickBestPlan can yield. Make sure we're not dead any which way. if (_killed) { QLOG() << "Subplanner: Killed while picking best plan for subchild " << orChildCQ->toString(); return false; } QuerySolution* bestSoln = solutions[bestPlan]; if (SolutionCacheData::USE_INDEX_TAGS_SOLN != bestSoln->cacheData->solnType) { QLOG() << "Subplanner: No indexed cache data for subchild " << orChildCQ->toString(); return false; } // Add the index assignments to our original query. Status tagStatus = QueryPlanner::tagAccordingToCache( orChild, bestSoln->cacheData->tree.get(), _indexMap); if (!tagStatus.isOK()) { QLOG() << "Subplanner: Failed to extract indices from subchild" << orChildCQ->toString(); return false; } cacheData->children.push_back(solutions[bestPlan]->cacheData->tree->clone()); } } // Must do this before using the planner functionality. sortUsingTags(theOr.get()); // Use the cached index assignments to build solnRoot. Takes ownership of 'theOr' QuerySolutionNode* solnRoot = QueryPlannerAccess::buildIndexedDataAccess( *_query, theOr.release(), false, _plannerParams.indices); if (NULL == solnRoot) { QLOG() << "Subplanner: Failed to build indexed data path for subplanned query\n"; return false; } QLOG() << "Subplanner: fully tagged tree is " << solnRoot->toString(); // Takes ownership of 'solnRoot' QuerySolution* soln = QueryPlannerAnalysis::analyzeDataAccess(*_query, _plannerParams, solnRoot); if (NULL == soln) { QLOG() << "Subplanner: Failed to analyze subplanned query"; return false; } // We want our franken-solution to be cached. SolutionCacheData* scd = new SolutionCacheData(); scd->tree.reset(cacheData.release()); soln->cacheData.reset(scd); QLOG() << "Subplanner: Composite solution is " << soln->toString() << endl; // We use one of these even if there is one plan. We do this so that the entry is cached // with stats obtained in the same fashion as a competitive ranking would have obtained // them. MultiPlanRunner* mpr = new MultiPlanRunner(_collection, _query.release()); WorkingSet* ws; PlanStage* root; verify(StageBuilder::build(*soln, &root, &ws)); // Takes ownership of all arguments. mpr->addPlan(soln, root, ws); mpr->setYieldPolicy(_policy); _underlyingRunner.reset(mpr); return true; } bool SubplanRunner::isEOF() { if (_killed) { return true; } // If we're still planning we're not done yet. if (SubplanRunner::PLANNING == _state) { return false; } // If we're running we best have a runner. invariant(_underlyingRunner.get()); return _underlyingRunner->isEOF(); } void SubplanRunner::saveState() { if (_killed) { return; } // We're ranking a sub-plan via an MPR or we're streaming results from this Runner. Either // way, pass on the request. if (NULL != _underlyingRunner.get()) { _underlyingRunner->saveState(); } } bool SubplanRunner::restoreState() { if (_killed) { return false; } // We're ranking a sub-plan via an MPR or we're streaming results from this Runner. Either // way, pass on the request. if (NULL != _underlyingRunner.get()) { return _underlyingRunner->restoreState(); } return true; } void SubplanRunner::setYieldPolicy(Runner::YieldPolicy policy) { if (_killed) { return; } // If somebody sets this before calling work() we need to know how to set it in our subquery // runners. _policy = policy; if (NULL != _underlyingRunner.get()) { _underlyingRunner->setYieldPolicy(policy); } } void SubplanRunner::invalidate(const DiskLoc& dl, InvalidationType type) { if (_killed) { return; } if (NULL != _underlyingRunner.get()) { _underlyingRunner->invalidate(dl, type); } } const std::string& SubplanRunner::ns() { return _ns; } void SubplanRunner::kill() { _killed = true; _collection = NULL; if (NULL != _underlyingRunner.get()) { _underlyingRunner->kill(); } } Status SubplanRunner::getInfo(TypeExplain** explain, PlanInfo** planInfo) const { if (SubplanRunner::RUNNING == _state) { invariant(_underlyingRunner.get()); return _underlyingRunner->getInfo(explain, planInfo); } else { return Status(ErrorCodes::BadValue, "no sub-plan to defer getInfo to"); } } } // namespace mongo