diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-04-23 16:17:05 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2014-04-24 12:26:24 -0400 |
commit | c7f47206ec84013a595d764a3542a3e9ac0305fd (patch) | |
tree | 5118d1bca0f8223a13be4a07f5fa58d7137add57 /src | |
parent | e02aec12a1af456223482fa0e968440700f2e7cc (diff) | |
download | mongo-c7f47206ec84013a595d764a3542a3e9ac0305fd.tar.gz |
SERVER-13632 Pass the collection to PlanExecutor and FetchStage
Diffstat (limited to 'src')
20 files changed, 80 insertions, 44 deletions
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index 92a3c19fbbc..be0636f3341 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -48,7 +48,7 @@ namespace mongo { // What collection? // not owned - Collection* collection; + const Collection* collection; // isNull by default. If you specify any value for this, you're responsible for the DiskLoc // not being invalidated before the first call to work(...). diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp index 8d9b3e28b51..7994ce7ab76 100644 --- a/src/mongo/db/exec/fetch.cpp +++ b/src/mongo/db/exec/fetch.cpp @@ -41,8 +41,17 @@ namespace mongo { MONGO_FP_DECLARE(fetchInMemoryFail); MONGO_FP_DECLARE(fetchInMemorySucceed); - FetchStage::FetchStage(WorkingSet* ws, PlanStage* child, const MatchExpression* filter) - : _ws(ws), _child(child), _filter(filter), _idBeingPagedIn(WorkingSet::INVALID_ID) { } + FetchStage::FetchStage(WorkingSet* ws, + PlanStage* child, + const MatchExpression* filter, + const Collection* collection) + : _collection(collection), + _ws(ws), + _child(child), + _filter(filter), + _idBeingPagedIn(WorkingSet::INVALID_ID) { + + } FetchStage::~FetchStage() { } diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h index 0b05dc389b5..ad32a3ec9ed 100644 --- a/src/mongo/db/exec/fetch.h +++ b/src/mongo/db/exec/fetch.h @@ -45,7 +45,11 @@ namespace mongo { */ class FetchStage : public PlanStage { public: - FetchStage(WorkingSet* ws, PlanStage* child, const MatchExpression* filter); + FetchStage(WorkingSet* ws, + PlanStage* child, + const MatchExpression* filter, + const Collection* collection); + virtual ~FetchStage(); virtual bool isEOF(); @@ -58,6 +62,7 @@ namespace mongo { PlanStageStats* getStats(); private: + /** * If the member (with id memberID) passes our filter, set *out to memberID and return that * ADVANCED. Otherwise, free memberID and return NEED_TIME. @@ -70,6 +75,10 @@ namespace mongo { */ StageState fetchCompleted(WorkingSetID* out); + // Collection which is used by this stage. Used to resolve record ids retrieved by child + // stages. The lifetime of the collection must supersede that of the stage. + const Collection* _collection; + // _ws is not owned by us. WorkingSet* _ws; scoped_ptr<PlanStage> _child; diff --git a/src/mongo/db/exec/s2near.cpp b/src/mongo/db/exec/s2near.cpp index 2af9b6259a7..d7eeda1d280 100644 --- a/src/mongo/db/exec/s2near.cpp +++ b/src/mongo/db/exec/s2near.cpp @@ -275,7 +275,7 @@ namespace mongo { IndexScan* scan = new IndexScan(params, _ws, _keyGeoFilter.get()); // Owns 'scan'. - _child.reset(new FetchStage(_ws, scan, _params.filter)); + _child.reset(new FetchStage(_ws, scan, _params.filter, _params.collection)); _seenInScan.clear(); } diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index 800b5ce1701..2e17b4447b6 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -141,9 +141,9 @@ namespace mongo { // Add a fetch at the top for the user so we can get obj back for sure. // TODO: Do we want to do this for the user? I think so. - PlanStage* rootFetch = new FetchStage(ws.get(), userRoot, NULL); + PlanStage* rootFetch = new FetchStage(ws.get(), userRoot, NULL, collection); - PlanExecutor runner(ws.release(), rootFetch); + PlanExecutor runner(ws.release(), rootFetch, collection); BSONArrayBuilder resultBuilder(result.subarrayStart("results")); @@ -292,7 +292,7 @@ namespace mongo { nodeArgs["node"].Obj(), workingSet, exprs); - return new FetchStage(workingSet, subNode, matcher); + return new FetchStage(workingSet, subNode, matcher, collection); } else if ("limit" == nodeName) { uassert(16937, "Limit stage doesn't have a filter (put it on the child)", diff --git a/src/mongo/db/query/cached_plan_runner.cpp b/src/mongo/db/query/cached_plan_runner.cpp index 60d72b4aa12..9b108cecb4b 100644 --- a/src/mongo/db/query/cached_plan_runner.cpp +++ b/src/mongo/db/query/cached_plan_runner.cpp @@ -54,7 +54,7 @@ namespace mongo { : _collection(collection), _canonicalQuery(canonicalQuery), _solution(solution), - _exec(new PlanExecutor(ws, root)), + _exec(new PlanExecutor(ws, root, collection)), _alreadyProduced(false), _updatedCache(false), _killed(false) { } @@ -210,7 +210,7 @@ namespace mongo { void CachedPlanRunner::setBackupPlan(QuerySolution* qs, PlanStage* root, WorkingSet* ws) { _backupSolution.reset(qs); - _backupPlan.reset(new PlanExecutor(ws, root)); + _backupPlan.reset(new PlanExecutor(ws, root, _collection)); } } // namespace mongo diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index a36b7478714..c6a576d80f9 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -111,7 +111,8 @@ namespace mongo { IndexScan* ix = new IndexScan(params, ws, NULL); if (IXSCAN_FETCH & options) { - return new InternalRunner(collection, new FetchStage(ws, ix, NULL), ws); + return new InternalRunner( + collection, new FetchStage(ws, ix, NULL, collection), ws); } else { return new InternalRunner(collection, ix, ws); diff --git a/src/mongo/db/query/internal_runner.cpp b/src/mongo/db/query/internal_runner.cpp index eea40aa6c7c..1f1384d3c93 100644 --- a/src/mongo/db/query/internal_runner.cpp +++ b/src/mongo/db/query/internal_runner.cpp @@ -42,7 +42,7 @@ namespace mongo { InternalRunner::InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws) : _collection(collection), - _exec(new PlanExecutor(ws, root)), + _exec(new PlanExecutor(ws, root, collection)), _policy(Runner::YIELD_MANUAL) { invariant( collection ); } diff --git a/src/mongo/db/query/multi_plan_runner.cpp b/src/mongo/db/query/multi_plan_runner.cpp index 316fddc7689..867e099f064 100644 --- a/src/mongo/db/query/multi_plan_runner.cpp +++ b/src/mongo/db/query/multi_plan_runner.cpp @@ -364,7 +364,8 @@ namespace mongo { // Run the best plan. Store it. _bestPlan.reset(new PlanExecutor(_candidates[_bestChild].ws, - _candidates[_bestChild].root)); + _candidates[_bestChild].root, + _collection)); _bestPlan->setYieldPolicy(_policy); _alreadyProduced = _candidates[_bestChild].results; _bestSolution.reset(_candidates[_bestChild].solution); @@ -381,7 +382,9 @@ namespace mongo { backupChild = i; _backupSolution = _candidates[i].solution; _backupAlreadyProduced = _candidates[i].results; - _backupPlan = new PlanExecutor(_candidates[i].ws, _candidates[i].root); + _backupPlan = new PlanExecutor(_candidates[i].ws, + _candidates[i].root, + _collection); _backupPlan->setYieldPolicy(_policy); break; } diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 5ce613cf608..bb06da53d3b 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -36,8 +36,8 @@ namespace mongo { - PlanExecutor::PlanExecutor(WorkingSet* ws, PlanStage* rt) - : _workingSet(ws) , _root(rt) , _killed(false) { } + PlanExecutor::PlanExecutor(WorkingSet* ws, PlanStage* rt, const Collection* collection) + : _collection(collection), _workingSet(ws), _root(rt), _killed(false) {} PlanExecutor::~PlanExecutor() { } diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 06214d7cdc2..f2382423d02 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -52,7 +52,7 @@ namespace mongo { */ class PlanExecutor { public: - PlanExecutor(WorkingSet* ws, PlanStage* rt); + PlanExecutor(WorkingSet* ws, PlanStage* rt, const Collection* collection); ~PlanExecutor(); // @@ -99,6 +99,10 @@ namespace mongo { void kill(); private: + // Collection over which this plan executor runs. Used to resolve record ids retrieved by + // the plan stages. The collection must not be destroyed while there are active plans. + const Collection* _collection; + boost::scoped_ptr<WorkingSet> _workingSet; boost::scoped_ptr<PlanStage> _root; boost::scoped_ptr<RunnerYieldPolicy> _yieldPolicy; diff --git a/src/mongo/db/query/single_solution_runner.cpp b/src/mongo/db/query/single_solution_runner.cpp index 65c54e757a2..05fdd079816 100644 --- a/src/mongo/db/query/single_solution_runner.cpp +++ b/src/mongo/db/query/single_solution_runner.cpp @@ -48,7 +48,7 @@ namespace mongo { : _collection( collection ), _canonicalQuery(canonicalQuery), _solution(soln), - _exec(new PlanExecutor(ws, root)) { } + _exec(new PlanExecutor(ws, root, collection)) { } SingleSolutionRunner::~SingleSolutionRunner() { } diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 8d5aef21b1c..125c1ef48c1 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -96,7 +96,7 @@ namespace mongo { const FetchNode* fn = static_cast<const FetchNode*>(root); PlanStage* childStage = buildStages(collection, qsol, fn->children[0], ws); if (NULL == childStage) { return NULL; } - return new FetchStage(ws, childStage, fn->filter.get()); + return new FetchStage(ws, childStage, fn->filter.get(), collection); } else if (STAGE_SORT == root->getType()) { const SortNode* sn = static_cast<const SortNode*>(root); diff --git a/src/mongo/dbtests/query_multi_plan_runner.cpp b/src/mongo/dbtests/query_multi_plan_runner.cpp index 8f3d36f0cfe..77806f66d8b 100644 --- a/src/mongo/dbtests/query_multi_plan_runner.cpp +++ b/src/mongo/dbtests/query_multi_plan_runner.cpp @@ -64,7 +64,7 @@ namespace QueryMultiPlanRunner { } IndexDescriptor* getIndex(Database* db, const BSONObj& obj) { - Collection* collection = db->getCollection( ns() ); + const Collection* collection = db->getCollection( ns() ); return collection->getIndexCatalog()->findIndexByKeyPattern(obj); } @@ -108,9 +108,12 @@ namespace QueryMultiPlanRunner { ixparams.bounds.endKey = BSON("" << 7); ixparams.bounds.endKeyInclusive = true; ixparams.direction = 1; + + const Collection* coll = ctx.ctx().db()->getCollection(ns()); + auto_ptr<WorkingSet> firstWs(new WorkingSet()); IndexScan* ix = new IndexScan(ixparams, firstWs.get(), NULL); - auto_ptr<PlanStage> firstRoot(new FetchStage(firstWs.get(), ix, NULL)); + auto_ptr<PlanStage> firstRoot(new FetchStage(firstWs.get(), ix, NULL, coll)); // Plan 1: CollScan with matcher. CollectionScanParams csparams; @@ -130,7 +133,7 @@ namespace QueryMultiPlanRunner { CanonicalQuery* cq = NULL; verify(CanonicalQuery::canonicalize(ns(), BSON("foo" << 7), &cq).isOK()); verify(NULL != cq); - MultiPlanRunner mpr(ctx.ctx().db()->getCollection(ns()),cq); + MultiPlanRunner mpr(coll, cq); mpr.addPlan(createQuerySolution(), firstRoot.release(), firstWs.release()); mpr.addPlan(createQuerySolution(), secondRoot.release(), secondWs.release()); diff --git a/src/mongo/dbtests/query_single_solution_runner.cpp b/src/mongo/dbtests/query_single_solution_runner.cpp index beae1e0ec7e..9a1774b61c8 100644 --- a/src/mongo/dbtests/query_single_solution_runner.cpp +++ b/src/mongo/dbtests/query_single_solution_runner.cpp @@ -126,16 +126,19 @@ namespace QuerySingleSolutionRunner { ixparams.bounds.endKey = BSON("" << end); ixparams.bounds.endKeyInclusive = true; ixparams.direction = 1; + + const Collection* coll = context.db()->getCollection(ns()); + auto_ptr<WorkingSet> ws(new WorkingSet()); IndexScan* ix = new IndexScan(ixparams, ws.get(), NULL); - auto_ptr<PlanStage> root(new FetchStage(ws.get(), ix, NULL)); + auto_ptr<PlanStage> root(new FetchStage(ws.get(), ix, NULL, coll)); CanonicalQuery* cq; verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); verify(NULL != cq); // Hand the plan off to the single solution runner. - return new SingleSolutionRunner(context.db()->getCollection(ns()), + return new SingleSolutionRunner(coll, cq, new QuerySolution(), root.release(), ws.release()); } diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index 8a8b9ac419c..33784a05d57 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -78,7 +78,7 @@ namespace QueryStageCollectionScan { // Create an executor to handle the scan. WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(params, ws, NULL); - PlanExecutor runner(ws, ps); + PlanExecutor runner(ws, ps, collection()); int resultCount = 0; BSONObj obj; @@ -96,7 +96,7 @@ namespace QueryStageCollectionScan { WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(params, ws, NULL); - PlanExecutor runner(ws, ps); + PlanExecutor runner(ws, ps, collection()); // Going backwards. int resultCount = expectedCount() - 1; @@ -334,7 +334,7 @@ namespace QueryStageCollectionScan { // Make a scan and have the runner own it. WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(params, ws, filterExpr.get()); - PlanExecutor runner(ws, ps); + PlanExecutor runner(ws, ps, params.collection); // Use the runner to count the number of objects scanned. int count = 0; @@ -437,7 +437,7 @@ namespace QueryStageCollectionScan { // Make a scan and have the runner own it. WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(params, ws, NULL); - PlanExecutor runner(ws, ps); + PlanExecutor runner(ws, ps, params.collection); int count = 0; for (BSONObj obj; Runner::RUNNER_ADVANCED == runner.getNext(&obj, NULL); ) { @@ -466,7 +466,7 @@ namespace QueryStageCollectionScan { WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(params, ws, NULL); - PlanExecutor runner(ws, ps); + PlanExecutor runner(ws, ps, params.collection); int count = 0; for (BSONObj obj; Runner::RUNNER_ADVANCED == runner.getNext(&obj, NULL); ) { diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp index 8ca27ed9ad0..0d4cf3b836c 100644 --- a/src/mongo/dbtests/query_stage_fetch.cpp +++ b/src/mongo/dbtests/query_stage_fetch.cpp @@ -118,7 +118,7 @@ namespace QueryStageFetch { mockStage->pushBack(mockMember); } - auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL, coll)); // Set the fail point to return not in memory. FailPointRegistry* reg = getGlobalFailPointRegistry(); @@ -189,7 +189,7 @@ namespace QueryStageFetch { mockStage->pushBack(mockMember); } - auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL, coll)); // Set the fail point to return in memory. FailPointRegistry* reg = getGlobalFailPointRegistry(); @@ -254,7 +254,7 @@ namespace QueryStageFetch { mockStage->pushBack(mockMember); } - auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL, coll)); // Set the fail point to return not in memory. FailPointRegistry* reg = getGlobalFailPointRegistry(); @@ -331,7 +331,7 @@ namespace QueryStageFetch { mockStage->pushBack(mockMember); } - auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL)); + auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), NULL, coll)); // Set the fail point to return not in memory so we get a fetch request. FailPointRegistry* reg = getGlobalFailPointRegistry(); @@ -397,8 +397,8 @@ namespace QueryStageFetch { auto_ptr<MatchExpression> filterExpr(swme.getValue()); // Matcher requires that foo==6 but we only have data with foo==5. - auto_ptr<FetchStage> fetchStage(new FetchStage(&ws, mockStage.release(), - filterExpr.get())); + auto_ptr<FetchStage> fetchStage( + new FetchStage(&ws, mockStage.release(), filterExpr.get(), coll)); // Set the fail point to return not in memory so we get a fetch request. FailPointRegistry* reg = getGlobalFailPointRegistry(); diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index 0063181b14a..78a32521693 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -146,7 +146,7 @@ namespace QueryStageMergeSortTests { ms->addChild(new IndexScan(params, ws, NULL)); // Must fetch if we want to easily pull out an obj. - PlanExecutor runner(ws, new FetchStage(ws, ms, NULL)); + PlanExecutor runner(ws, new FetchStage(ws, ms, NULL, coll), coll); for (int i = 0; i < N; ++i) { BSONObj first, second; @@ -208,7 +208,7 @@ namespace QueryStageMergeSortTests { params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(params, ws, NULL)); - PlanExecutor runner(ws, new FetchStage(ws, ms, NULL)); + PlanExecutor runner(ws, new FetchStage(ws, ms, NULL, coll), coll); for (int i = 0; i < N; ++i) { BSONObj first, second; @@ -270,7 +270,7 @@ namespace QueryStageMergeSortTests { params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(params, ws, NULL)); - PlanExecutor runner(ws, new FetchStage(ws, ms, NULL)); + PlanExecutor runner(ws, new FetchStage(ws, ms, NULL, coll), coll); for (int i = 0; i < N; ++i) { BSONObj first, second; @@ -335,7 +335,7 @@ namespace QueryStageMergeSortTests { params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(params, ws, NULL)); - PlanExecutor runner(ws, new FetchStage(ws, ms, NULL)); + PlanExecutor runner(ws, new FetchStage(ws, ms, NULL, coll), coll); for (int i = 0; i < N; ++i) { BSONObj first, second; @@ -399,7 +399,7 @@ namespace QueryStageMergeSortTests { params.bounds.endKey = BSON("" << 51 << "" << MaxKey); ms->addChild(new IndexScan(params, ws, NULL)); - PlanExecutor runner(ws, new FetchStage(ws, ms, NULL)); + PlanExecutor runner(ws, new FetchStage(ws, ms, NULL, coll), coll); // Only getting results from the a:1 index scan. for (int i = 0; i < N; ++i) { @@ -451,7 +451,7 @@ namespace QueryStageMergeSortTests { ms->addChild(new IndexScan(params, ws, NULL)); } - PlanExecutor runner(ws, new FetchStage(ws, ms, NULL)); + PlanExecutor runner(ws, new FetchStage(ws, ms, NULL, coll), coll); for (int i = 0; i < numIndices; ++i) { BSONObj obj; diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index 462a82ef248..2da0d135aa3 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -120,7 +120,8 @@ namespace QueryStageSortTests { params.limit = limit(); // Must fetch so we can look at the doc as a BSONObj. - PlanExecutor runner(ws, new FetchStage(ws, new SortStage(params, ws, ms), NULL)); + PlanExecutor runner( + ws, new FetchStage(ws, new SortStage(params, ws, ms), NULL, coll), coll); // Look at pairs of objects to make sure that the sort order is pairwise (and therefore // totally) correct. @@ -354,7 +355,8 @@ namespace QueryStageSortTests { params.limit = 0; // We don't get results back since we're sorting some parallel arrays. - PlanExecutor runner(ws, new FetchStage(ws, new SortStage(params, ws, ms), NULL)); + PlanExecutor runner( + ws, new FetchStage(ws, new SortStage(params, ws, ms), NULL, coll), coll); Runner::RunnerState runnerState = runner.getNext(NULL, NULL); ASSERT_EQUALS(Runner::RUNNER_ERROR, runnerState); } diff --git a/src/mongo/dbtests/query_stage_tests.cpp b/src/mongo/dbtests/query_stage_tests.cpp index 79e7d733284..3bf9b0ca31f 100644 --- a/src/mongo/dbtests/query_stage_tests.cpp +++ b/src/mongo/dbtests/query_stage_tests.cpp @@ -78,7 +78,9 @@ namespace QueryStageTests { auto_ptr<MatchExpression> filterExpr(swme.getValue()); WorkingSet* ws = new WorkingSet(); - PlanExecutor runner(ws, new IndexScan(params, ws, filterExpr.get())); + PlanExecutor runner(ws, + new IndexScan(params, ws, filterExpr.get()), + ctx.ctx().db()->getCollection(ns())); int count = 0; for (DiskLoc dl; Runner::RUNNER_ADVANCED == runner.getNext(NULL, &dl); ) { |