From eac8547116b770dbe3906fe4eb07bbfe1bbf3ede Mon Sep 17 00:00:00 2001 From: Justin Seyster Date: Thu, 4 Nov 2021 00:10:14 -0400 Subject: SERVER-59080 Do not discard work from trial run when replanning is not needed --- .../cached_plan_trial_does_not_discard_work.js | 126 +++++++++++++++++++++ src/mongo/db/exec/trial_run_tracker.h | 26 ++++- src/mongo/db/query/plan_ranker.h | 3 + src/mongo/db/query/sbe_cached_solution_planner.cpp | 81 ++++++++----- src/mongo/db/query/sbe_cached_solution_planner.h | 23 +++- src/mongo/db/query/sbe_runtime_planner.cpp | 94 ++++++--------- src/mongo/db/query/sbe_runtime_planner.h | 21 +++- 7 files changed, 277 insertions(+), 97 deletions(-) create mode 100644 jstests/core/cached_plan_trial_does_not_discard_work.js diff --git a/jstests/core/cached_plan_trial_does_not_discard_work.js b/jstests/core/cached_plan_trial_does_not_discard_work.js new file mode 100644 index 00000000000..9f4cf193d22 --- /dev/null +++ b/jstests/core/cached_plan_trial_does_not_discard_work.js @@ -0,0 +1,126 @@ +// Test that, when running a trial of a cached plan that has blocking stages, the planner does not +// invalidate the plan (and discard its results) at the end of the trial unless replanning is +// needed. +// +// @tags: [ +// # This test attempts to perform queries and introspect the server's plan cache entries. The +// # former operation may be routed to a secondary in the replica set, whereas the latter must be +// # routed to the primary. +// assumes_read_concern_unchanged, +// assumes_read_preference_unchanged, +// assumes_unsharded_collection, +// does_not_support_stepdowns, +// requires_fcv_52, +// requires_profiling, +// ] +(function() { +'use strict'; + +load("jstests/libs/profiler.js"); // getLatestProfileEntry. +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. + +if (checkSBEEnabled(db, ["featureFlagSbePlanCache"])) { + jsTest.log("Skipping test because SBE and SBE plan cache are both enabled."); + return; +} + +const testDb = db.getSiblingDB('cached_plan_trial_does_not_discard_work'); +assert.commandWorked(testDb.dropDatabase()); +const coll = testDb.getCollection('test'); + +const queryPlanEvaluationMaxResults = (() => { + const getParamRes = assert.commandWorked( + testDb.adminCommand({getParameter: 1, internalQueryPlanEvaluationMaxResults: 1})); + return getParamRes["internalQueryPlanEvaluationMaxResults"]; +})(); + +const queryCacheEvictionRatio = (() => { + const getParamRes = assert.commandWorked( + testDb.adminCommand({getParameter: 1, internalQueryCacheEvictionRatio: 1})); + return getParamRes["internalQueryCacheEvictionRatio"]; +})(); + +assert.commandWorked(coll.createIndex({a: 1})); +assert.commandWorked(coll.createIndex({b: 1, d: 1})); + +// Add enough documents to the collection to ensure that the test query will always run through its +// "trial period" when using the cached plan. +const numMatchingDocs = 2 * queryPlanEvaluationMaxResults; +let bulk = coll.initializeUnorderedBulkOp(); +for (let i = 0; i < 100; i++) { + // Add documents that will not match the test query but will favor the {a: 1} index. + bulk.insert({a: 0, b: 1, c: i, d: i % 2}); +} +for (let i = 100; i < 100 + numMatchingDocs; i++) { + // Add documents that will match the test query. + bulk.insert({a: 1, b: 1, c: i, d: i % 2}); +} +assert.commandWorked(bulk.execute()); + +// We enable profiling and run the test query three times. The first two times, it will go through +// multiplanning. +function runTestQuery(comment) { + return coll.find({a: 1, b: 1}) + .sort({c: 1}) + .batchSize(numMatchingDocs + 1) + .comment(comment) + .itcount(); +} + +testDb.setProfilingLevel(2); +let lastComment; +for (let i = 0; i < 3; i++) { + lastComment = `test query: ${i}`; + const numResults = runTestQuery(lastComment); + assert.eq(numResults, numMatchingDocs); +} + +// Get the profile entry for the third execution, which should have bypassed the multiplanner and +// used a cached plan. +const profileEntry = getLatestProfilerEntry( + testDb, {'command.find': coll.getName(), 'command.comment': lastComment}); +assert(!profileEntry.fromMultiPlanner, profileEntry); +assert('planCacheKey' in profileEntry, profileEntry); + +// We expect the cached plan to run through its "trial period," but the planner should determine +// that the cached plan is still good and does _not_ need replanning. Previously, the planner would +// still need to close the execution tree in this scenario, discarding all the work it had already +// done. This test ensures that behavior is corrected: the execution tree should only need to be +// opened 1 time. +assert.eq(profileEntry.execStats.opens, 1, profileEntry); + +const planCacheEntry = (() => { + const planCache = + coll.getPlanCache().list([{$match: {planCacheKey: profileEntry.planCacheKey}}]); + assert.eq(planCache.length, 1, planCache); + return planCache[0]; +})(); + +// Modify the test data so that it will force a replan. We remove all the documents that will match +// the test query and add non-matching documents that will get examined by the index scan (for +// either index). The planner's criterion for when a cached index scan has done too much work (and +// should be replanned) is based on the "works" value in the plan cache entry and the +// "internalQueryCacheEvictionRatio" server parameter, so we use those values to determine how many +// documents to add. +// +// This portion of the test validates that replanning still works as desired even after the query +// planner changes to allow "trial periods" that do not discard results when replanning is not +// necessary. + +assert.commandWorked(coll.remove({a: 1, b: 1})); +bulk = coll.initializeUnorderedBulkOp(); +for (let i = 0; i < queryCacheEvictionRatio * planCacheEntry.works + 1; i++) { + bulk.insert({a: 1, b: 0, c: i}); + bulk.insert({a: 0, b: 1, c: i}); +} +assert.commandWorked(bulk.execute()); + +// Run the query one last time, and get its profile entry to enure it triggered replanning. +lastComment = "test query expected to trigger replanning"; +const numResults = runTestQuery(lastComment); +assert.eq(numResults, 0); + +const replanProfileEntry = getLatestProfilerEntry( + testDb, {'command.find': coll.getName(), 'command.comment': lastComment}); +assert(replanProfileEntry.replanned, replanProfileEntry); +}()); diff --git a/src/mongo/db/exec/trial_run_tracker.h b/src/mongo/db/exec/trial_run_tracker.h index 3ccde274035..0f93ffbdc73 100644 --- a/src/mongo/db/exec/trial_run_tracker.h +++ b/src/mongo/db/exec/trial_run_tracker.h @@ -31,6 +31,7 @@ #include #include +#include #include namespace mongo { @@ -56,7 +57,7 @@ public: }; /** - * Constructs a `TrialRunTracker' which indicates that the trial period is over when any + * Constructs a 'TrialRunTracker' which indicates that the trial period is over when any * 'TrialRunMetric' exceeds the maximum provided at construction. * * Callers can also pass a value of zero to indicate that the given metric should not be @@ -66,10 +67,22 @@ public: std::enable_if_t = 0> TrialRunTracker(MaxMetrics... maxMetrics) : _maxMetrics{maxMetrics...} {} + /** + * Constructs a 'TrialRunTracker' that also has an '_onTrialEnd' function, which gets called + * when any 'TrialRunMetric' exceeds its maximum. When an '_onTrialEnd' callback is present, it + * must return true for 'trackProgress' to return true. By returning false, '_onTrialEnd' can + * prevent tracking from halting plan execution, thereby upgrading a trial run to a normal run. + */ + template + TrialRunTracker(std::function onTrialEnd, MaxMetrics... maxMetrics) + : TrialRunTracker{maxMetrics...} { + _onTrialEnd = std::move(onTrialEnd); + } + /** * Increments the trial run metric specified as a template parameter 'metric' by the - * 'metricIncrement' value and returns 'true' if the updated metric value has exceeded - * its maximum. + * 'metricIncrement' value and, if the updated metric value has exceeded its maximum, calls the + * '_onTrialEnd' if there is one and returns true (unless '_onTrialEnd' returned false). * * This is a no-op, and will return false, if the given metric is not being tracked by this * 'TrialRunTracker'. @@ -91,7 +104,7 @@ public: } _metrics[metric] += metricIncrement; - if (_metrics[metric] > _maxMetrics[metric]) { + if (_metrics[metric] > _maxMetrics[metric] && callOnTrialEnd()) { _done = true; } return _done; @@ -103,9 +116,14 @@ public: return _metrics[metric]; } + bool callOnTrialEnd() { + return !_onTrialEnd || _onTrialEnd(); + } + private: const size_t _maxMetrics[TrialRunMetric::kLastElem]; size_t _metrics[TrialRunMetric::kLastElem]{0}; bool _done{false}; + std::function _onTrialEnd{}; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_ranker.h b/src/mongo/db/query/plan_ranker.h index 4389b0b1fec..5008e88f98e 100644 --- a/src/mongo/db/query/plan_ranker.h +++ b/src/mongo/db/query/plan_ranker.h @@ -186,6 +186,9 @@ struct BaseCandidatePlan { // Indicates whether this candidate plan has completed the trial run early by achieving one // of the trial run metrics. bool exitedEarly{false}; + // Indicates that the trial run for a cached plan crossed the threshold of reads that should + // trigger a replanning phase. + bool needsReplanning{false}; // If the candidate plan has failed in a recoverable fashion during the trial run, contains a // non-OK status. Status status{Status::OK()}; diff --git a/src/mongo/db/query/sbe_cached_solution_planner.cpp b/src/mongo/db/query/sbe_cached_solution_planner.cpp index c5970636a1d..6c058df1286 100644 --- a/src/mongo/db/query/sbe_cached_solution_planner.cpp +++ b/src/mongo/db/query/sbe_cached_solution_planner.cpp @@ -50,14 +50,14 @@ CandidatePlans CachedSolutionPlanner::plan( const double evictionRatio = internalQueryCacheEvictionRatio; const size_t maxReadsBeforeReplan = evictionRatio * _decisionReads; - auto candidate = [&]() { - // In cached solution planning we collect execution stats with an upper bound on reads - // allowed per trial run computed based on previous decision reads. - auto candidates = - collectExecutionStats(std::move(solutions), std::move(roots), maxReadsBeforeReplan); - invariant(candidates.size() == 1); - return std::move(candidates[0]); - }(); + // In cached solution planning we collect execution stats with an upper bound on reads allowed + // per trial run computed based on previous decision reads. If the trial run ends before + // reaching EOF, it will use the 'checkNumReads' function to determine if it should continue + // executing or immediately terminate execution. + auto candidate = collectExecutionStatsForCachedPlan(std::move(solutions[0]), + std::move(roots[0].first), + std::move(roots[0].second), + maxReadsBeforeReplan); auto explainer = plan_explainer_factory::make( candidate.root.get(), &candidate.data, candidate.solution.get()); @@ -75,10 +75,15 @@ CandidatePlans CachedSolutionPlanner::plan( auto stats{candidate.root->getStats(false /* includeDebugInfo */)}; auto numReads{calculateNumberOfReads(stats.get())}; - // If the cached plan hit EOF quickly enough, or still as efficient as before, then no need to - // replan. Finalize the cached plan and return it. - if (stats->common.isEOF || numReads <= maxReadsBeforeReplan) { - return {makeVector(finalizeExecutionPlan(std::move(stats), std::move(candidate))), 0}; + // If the trial run executed in 'collectExecutionStats()' did not determine that a replan is + // necessary, then return that plan as is. The executor can continue using it. All results + // generated during the trial are stored with the plan so that the executor can return those to + // the user as well. + if (!candidate.needsReplanning) { + tassert(590800, + "Cached plan exited early without 'needsReplanning' set.", + !candidate.exitedEarly); + return {makeVector(std::move(candidate)), 0}; } // If we're here, the trial period took more than 'maxReadsBeforeReplan' physical reads. This @@ -99,22 +104,44 @@ CandidatePlans CachedSolutionPlanner::plan( << _decisionReads << " reads but it took at least " << numReads << " reads"); } -plan_ranker::CandidatePlan CachedSolutionPlanner::finalizeExecutionPlan( - std::unique_ptr stats, plan_ranker::CandidatePlan candidate) const { - // If the winning stage has exited early, clear the results queue and reopen the plan stage - // tree, as we cannot resume such execution tree from where the trial run has stopped, and, as - // a result, we cannot stash the results returned so far in the plan executor. - if (!stats->common.isEOF && candidate.exitedEarly) { - if (_cq.getExplain()) { - // We save the stats on early exit if it's either an explain operation, as closing and - // re-opening the winning plan (below) changes the stats. - candidate.data.savedStatsOnEarlyExit = - candidate.root->getStats(true /* includeDebugInfo */); +plan_ranker::CandidatePlan CachedSolutionPlanner::collectExecutionStatsForCachedPlan( + std::unique_ptr solution, + std::unique_ptr root, + stage_builder::PlanStageData data, + size_t maxTrialPeriodNumReads) { + const auto maxNumResults{trial_period::getTrialPeriodNumToReturn(_cq)}; + + plan_ranker::CandidatePlan candidate{std::move(solution), + std::move(root), + std::move(data), + false /* exitedEarly*/, + false /* needsReplanning */, + Status::OK()}; + + ON_BLOCK_EXIT([rootPtr = candidate.root.get()] { rootPtr->detachFromTrialRunTracker(); }); + + auto needsReplanningCheck = [maxTrialPeriodNumReads](PlanStage* candidateRoot) { + auto stats{candidateRoot->getStats(false /* includeDebugInfo */)}; + auto numReads{calculateNumberOfReads(stats.get())}; + return numReads > maxTrialPeriodNumReads; + }; + auto trackerRequirementCheck = [&needsReplanningCheck, &candidate]() { + bool shouldExitEarly = needsReplanningCheck(candidate.root.get()); + if (!shouldExitEarly) { + candidate.root->detachFromTrialRunTracker(); } - candidate.root->close(); - candidate.root->open(false); - // Clear the results queue. - candidate.results = decltype(candidate.results){}; + candidate.needsReplanning = (candidate.needsReplanning || shouldExitEarly); + return shouldExitEarly; + }; + auto tracker = std::make_unique( + std::move(trackerRequirementCheck), maxNumResults, maxTrialPeriodNumReads); + + candidate.root->attachToTrialRunTracker(std::move(tracker.get())); + + auto candidateDone = executeCandidateTrial(&candidate, maxNumResults); + if (candidate.status.isOK() && !candidateDone && !candidate.needsReplanning) { + candidate.needsReplanning = + candidate.needsReplanning || needsReplanningCheck(candidate.root.get()); } return candidate; diff --git a/src/mongo/db/query/sbe_cached_solution_planner.h b/src/mongo/db/query/sbe_cached_solution_planner.h index 8381d453672..a88b844b1c2 100644 --- a/src/mongo/db/query/sbe_cached_solution_planner.h +++ b/src/mongo/db/query/sbe_cached_solution_planner.h @@ -61,10 +61,27 @@ public: private: /** - * Finalizes the winning plan before passing it to the caller as a result of the planning. + * Executes the "trial" portion of a single plan until it + * - reaches EOF, + * - reaches the 'maxNumResults' limit, + * - early exits via the TrialRunTracker, or + * - returns a failure Status. + * + * All documents returned by the plan are enqueued into the 'CandidatePlan->results' queue. + * + * When the trial period ends, this function checks the stats to determine if the number of + * reads during the trial meets the criteria for replanning, in which case it sets the + * 'needsReplanning' flag of the resulting CandidatePlan to true. + * + * The execution plan for the resulting CandidatePlan remains open, but if the 'exitedEarly' + * flag is set, the plan is in an invalid state and must be closed and reopened before it can be + * executed. */ - plan_ranker::CandidatePlan finalizeExecutionPlan(std::unique_ptr stats, - plan_ranker::CandidatePlan candidate) const; + plan_ranker::CandidatePlan collectExecutionStatsForCachedPlan( + std::unique_ptr solution, + std::unique_ptr root, + stage_builder::PlanStageData data, + size_t maxTrialPeriodNumReads); /** * Uses the QueryPlanner and the MultiPlanner to re-generate candidate plans for this diff --git a/src/mongo/db/query/sbe_runtime_planner.cpp b/src/mongo/db/query/sbe_runtime_planner.cpp index 8389480ab42..ad25e59841a 100644 --- a/src/mongo/db/query/sbe_runtime_planner.cpp +++ b/src/mongo/db/query/sbe_runtime_planner.cpp @@ -113,6 +113,30 @@ BaseRuntimePlanner::prepareExecutionPlan(PlanStage* root, return std::make_tuple(resultSlot, recordIdSlot, exitedEarly); } +bool BaseRuntimePlanner::executeCandidateTrial(plan_ranker::CandidatePlan* candidate, + size_t maxNumResults) { + _indexExistenceChecker.check(); + + auto status = prepareExecutionPlan(candidate->root.get(), &candidate->data); + if (!status.isOK()) { + candidate->status = status.getStatus(); + return candidate; + } + + auto [resultAccessor, recordIdAccessor, exitedEarly] = status.getValue(); + candidate->exitedEarly = exitedEarly; + + for (size_t it = 0; it < maxNumResults && candidate->status.isOK() && !candidate->exitedEarly && + !candidate->needsReplanning; + ++it) { + if (fetchNextDocument(candidate, std::make_pair(resultAccessor, recordIdAccessor))) { + return true; + } + } + + return false; +} + std::vector BaseRuntimePlanner::collectExecutionStats( std::vector> solutions, std::vector, stage_builder::PlanStageData>> roots, @@ -121,15 +145,6 @@ std::vector BaseRuntimePlanner::collectExecutionStat std::vector candidates; std::vector> accessors; - std::vector>> trialRunTrackers; - - ON_BLOCK_EXIT([&] { - // Detach each SBE plan's TrialRunTracker. - while (!trialRunTrackers.empty()) { - trialRunTrackers.back().first->detachFromTrialRunTracker(); - trialRunTrackers.pop_back(); - } - }); const auto maxNumResults{trial_period::getTrialPeriodNumToReturn(_cq)}; @@ -169,60 +184,21 @@ std::vector BaseRuntimePlanner::collectExecutionStat // Attach a unique TrialRunTracker to the plan, which is configured to use at most // 'maxNumReads' reads. auto tracker = std::make_unique(trackerResultsBudget, maxNumReads); + ON_BLOCK_EXIT([rootPtr = root.get()] { rootPtr->detachFromTrialRunTracker(); }); root->attachToTrialRunTracker(tracker.get()); - trialRunTrackers.emplace_back(root.get(), std::move(tracker)); - - // Before preparing our plan, verify that none of the required indexes were dropped. - // This can occur if a yield occurred during a previously trialed plan. - _indexExistenceChecker.check(); - - auto status = prepareExecutionPlan(root.get(), &data); - auto [resultAccessor, recordIdAccessor, exitedEarly] = - [&]() -> std::tuple { - if (status.isOK()) { - return status.getValue(); - } - // The candidate plan returned a failure that is not fatal to the execution of the - // query, as long as we have other candidates that haven't failed. We will mark the - // candidate as failed and keep preparing any remaining candidate plans. - return {}; - }(); + candidates.push_back({std::move(solutions[planIndex]), std::move(root), std::move(data), - exitedEarly, - status.getStatus()}); - accessors.push_back({resultAccessor, recordIdAccessor}); - - // The current candidate is located at the end of each vector. - auto endIdx = candidates.size() - 1; - - // Run the plan until the plan finishes, uses up its allowed budget of storage reads, - // or returns 'maxNumResults' results. - for (size_t it = 0; it < maxNumResults; ++it) { - // Even if we had a candidate plan that exited early, we still want continue the - // trial run for the remaining plans as the early exited plan may not be the best. - // For example, it could be blocked in a SORT stage until one of the trial period - // metrics was reached, causing the plan to raise an early exit exception and return - // control back to the runtime planner. If that happens, we need to continue and - // complete the trial period for all candidates, as some of them may have a better - // cost. - if (!candidates[endIdx].status.isOK() || candidates[endIdx].exitedEarly) { - break; - } - - bool candidateDone = fetchNextDocument(&candidates[endIdx], accessors[endIdx]); - bool reachedMaxNumResults = (it == maxNumResults - 1); - - // If this plan finished or returned 'maxNumResults', then use its number of reads - // as the value for 'maxNumReads' if it's the smallest we've seen. - if (candidateDone || reachedMaxNumResults) { - maxNumReads = std::min( - maxNumReads, - trialRunTrackers[endIdx] - .second->getMetric()); - break; - } + false /* exitedEarly */, + false /* needsReplanning */, + Status::OK()}); + auto& currentCandidate = candidates.back(); + bool candidateDone = executeCandidateTrial(¤tCandidate, maxNumResults); + if (candidateDone || + (currentCandidate.status.isOK() && !currentCandidate.exitedEarly)) { + maxNumReads = std::min( + maxNumReads, tracker->getMetric()); } } }; diff --git a/src/mongo/db/query/sbe_runtime_planner.h b/src/mongo/db/query/sbe_runtime_planner.h index 3776e571e6f..f4f0955abd4 100644 --- a/src/mongo/db/query/sbe_runtime_planner.h +++ b/src/mongo/db/query/sbe_runtime_planner.h @@ -97,6 +97,21 @@ protected: StatusWith> prepareExecutionPlan(PlanStage* root, stage_builder::PlanStageData* data) const; + /** + * Executes a candidate plan until it + * - reaches EOF, + * - reaches the 'maxNumResults' limit, + * - early exits via the TrialRunTracker, or + * - returns a failure Status. + * + * The execution process populates the 'results' array of the 'candidate' plan with any results + * from execution the plan. This function also sets the 'status' and 'exitedEarly' fields of the + * input 'candidate' object when applicable. + * + * Returns true iff the candidate plan reaches EOF. + */ + bool executeCandidateTrial(plan_ranker::CandidatePlan* candidate, size_t maxNumResults); + /** * Executes each plan in a round-robin fashion to collect execution stats. Stops when: * * Any plan hits EOF. @@ -108,10 +123,8 @@ protected: * Upon completion returns a vector of candidate plans. Execution stats can be obtained for each * of the candidate plans by calling 'CandidatePlan->root->getStats()'. * - * After the trial period ends, all plans remain open. - * - * The number of reads allowed for a trial execution period is bounded by - * 'maxTrialPeriodNumReads'. + * After the trial period ends, all plans remain open, but 'exitedEarly' plans are in an invalid + * state. Any 'exitedEarly' plans must be closed and reopened before they can be executed. */ std::vector collectExecutionStats( std::vector> solutions, -- cgit v1.2.1