diff options
author | David Storch <david.storch@10gen.com> | 2014-07-15 17:30:02 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2014-07-22 09:24:11 -0400 |
commit | 7ffac7f351b80f84589349e44693a94d5cc5e14c (patch) | |
tree | 3707298920eabe877af03963f3f61f74c5a61fc5 /src | |
parent | fb270d89cbcfdb98c3cee3e631c76ca035c7b4f0 (diff) | |
download | mongo-7ffac7f351b80f84589349e44693a94d5cc5e14c.tar.gz |
SERVER-14407 replace Runner with PlanExecutor
Diffstat (limited to 'src')
115 files changed, 1114 insertions, 1064 deletions
diff --git a/src/mongo/db/catalog/collection_cursor_cache.cpp b/src/mongo/db/catalog/collection_cursor_cache.cpp index c8e8feae704..d358e7a4a52 100644 --- a/src/mongo/db/catalog/collection_cursor_cache.cpp +++ b/src/mongo/db/catalog/collection_cursor_cache.cpp @@ -37,7 +37,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/query/runner.h" +#include "mongo/db/query/plan_executor.h" #include "mongo/platform/random.h" #include "mongo/util/startup_test.h" @@ -276,16 +276,16 @@ namespace mongo { void CollectionCursorCache::invalidateAll( bool collectionGoingAway ) { SimpleMutex::scoped_lock lk( _mutex ); - for ( RunnerSet::iterator it = _nonCachedRunners.begin(); - it != _nonCachedRunners.end(); + for ( ExecSet::iterator it = _nonCachedExecutors.begin(); + it != _nonCachedExecutors.end(); ++it ) { - // we kill the runner, but it deletes itself - Runner* runner = *it; - runner->kill(); - invariant( runner->collection() == NULL ); + // we kill the executor, but it deletes itself + PlanExecutor* exec = *it; + exec->kill(); + invariant( exec->collection() == NULL ); } - _nonCachedRunners.clear(); + _nonCachedExecutors.clear(); if ( collectionGoingAway ) { // we're going to wipe out the world @@ -294,7 +294,7 @@ namespace mongo { cc->kill(); - invariant( cc->getRunner() == NULL || cc->getRunner()->collection() == NULL ); + invariant( cc->getExecutor() == NULL || cc->getExecutor()->collection() == NULL ); // If there is a pinValue >= 100, somebody is actively using the CC and we do // not delete it. Instead we notify the holder that we killed it. The holder @@ -310,7 +310,7 @@ namespace mongo { else { CursorMap newMap; - // collection will still be around, just all Runners are invalid + // collection will still be around, just all PlanExecutors are invalid for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { ClientCursor* cc = i->second; @@ -320,10 +320,10 @@ namespace mongo { continue; } - // Note that a valid ClientCursor state is "no cursor no runner." This is because + // Note that a valid ClientCursor state is "no cursor no executor." This is because // the set of active cursor IDs in ClientCursor is used as representation of query // state. See sharding_block.h. TODO(greg,hk): Move this out. - if (NULL == cc->getRunner() ) { + if (NULL == cc->getExecutor() ) { newMap.insert( *i ); continue; } @@ -334,9 +334,9 @@ namespace mongo { } else { // this is pinned, so still alive, so we leave around - // we kill the Runner to signal - if ( cc->getRunner() ) - cc->getRunner()->kill(); + // we kill the PlanExecutor to signal + if ( cc->getExecutor() ) + cc->getExecutor()->kill(); newMap.insert( *i ); } @@ -350,18 +350,18 @@ namespace mongo { InvalidationType type ) { SimpleMutex::scoped_lock lk( _mutex ); - for ( RunnerSet::iterator it = _nonCachedRunners.begin(); - it != _nonCachedRunners.end(); + for ( ExecSet::iterator it = _nonCachedExecutors.begin(); + it != _nonCachedExecutors.end(); ++it ) { - Runner* runner = *it; - runner->invalidate(dl, type); + PlanExecutor* exec = *it; + exec->invalidate(dl, type); } for ( CursorMap::const_iterator i = _cursors.begin(); i != _cursors.end(); ++i ) { - Runner* runner = i->second->getRunner(); - if ( runner ) { - runner->invalidate(dl, type); + PlanExecutor* exec = i->second->getExecutor(); + if ( exec ) { + exec->invalidate(dl, type); } } } @@ -387,18 +387,18 @@ namespace mongo { return toDelete.size(); } - void CollectionCursorCache::registerRunner( Runner* runner ) { + void CollectionCursorCache::registerExecutor( PlanExecutor* exec ) { if (!useExperimentalDocLocking) { SimpleMutex::scoped_lock lk(_mutex); - const std::pair<RunnerSet::iterator, bool> result = _nonCachedRunners.insert(runner); + const std::pair<ExecSet::iterator, bool> result = _nonCachedExecutors.insert(exec); invariant(result.second); // make sure this was inserted } } - void CollectionCursorCache::deregisterRunner( Runner* runner ) { + void CollectionCursorCache::deregisterExecutor( PlanExecutor* exec ) { if (!useExperimentalDocLocking) { SimpleMutex::scoped_lock lk(_mutex); - _nonCachedRunners.erase(runner); + _nonCachedExecutors.erase(exec); } } diff --git a/src/mongo/db/catalog/collection_cursor_cache.h b/src/mongo/db/catalog/collection_cursor_cache.h index c1f3647b021..524f1e12e11 100644 --- a/src/mongo/db/catalog/collection_cursor_cache.h +++ b/src/mongo/db/catalog/collection_cursor_cache.h @@ -41,14 +41,14 @@ namespace mongo { class OperationContext; class PseudoRandom; - class Runner; + class PlanExecutor; class CollectionCursorCache { public: CollectionCursorCache( const StringData& ns ); /** - * will kill() all Runner instances it has + * will kill() all PlanExecutor instances it has */ ~CollectionCursorCache(); @@ -62,8 +62,8 @@ namespace mongo { void invalidateAll( bool collectionGoingAway ); /** - * Broadcast a document invalidation to all relevant Runner(s). invalidateDocument must - * called *before* the provided DiskLoc is about to be deleted or mutated. + * Broadcast a document invalidation to all relevant PlanExecutor(s). invalidateDocument + * must called *before* the provided DiskLoc is about to be deleted or mutated. */ void invalidateDocument( const DiskLoc& dl, InvalidationType type ); @@ -78,16 +78,16 @@ namespace mongo { // ----------------- /** - * Register a runner so that it can be notified of deletion/invalidation during yields. - * Must be called before a runner yields. If a runner is cached (inside a ClientCursor) it - * MUST NOT be registered; the two are mutually exclusive. + * Register an executor so that it can be notified of deletion/invalidation during yields. + * Must be called before an executor yields. If an executor is cached (inside a + * ClientCursor) it MUST NOT be registered; the two are mutually exclusive. */ - void registerRunner(Runner* runner); + void registerExecutor(PlanExecutor* exec); /** - * Remove a runner from the runner registry. + * Remove an executor from the registry. */ - void deregisterRunner(Runner* runner); + void deregisterExecutor(PlanExecutor* exec); // ----------------- @@ -130,8 +130,8 @@ namespace mongo { SimpleMutex _mutex; - typedef unordered_set<Runner*> RunnerSet; - RunnerSet _nonCachedRunners; + typedef unordered_set<PlanExecutor*> ExecSet; + ExecSet _nonCachedExecutors; typedef std::map<CursorId,ClientCursor*> CursorMap; CursorMap _cursors; diff --git a/src/mongo/db/catalog/index_catalog.cpp b/src/mongo/db/catalog/index_catalog.cpp index 310f224657e..edb6219f607 100644 --- a/src/mongo/db/catalog/index_catalog.cpp +++ b/src/mongo/db/catalog/index_catalog.cpp @@ -221,14 +221,14 @@ namespace mongo { return Status::OK(); // these checks have already been done } - auto_ptr<Runner> runner( + auto_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, db->_indexesName, db->getCollection(txn, db->_indexesName))); BSONObj index; Runner::RunnerState state; - while ( Runner::RUNNER_ADVANCED == (state = runner->getNext(&index, NULL)) ) { + while ( Runner::RUNNER_ADVANCED == (state = exec->getNext(&index, NULL)) ) { const BSONObj key = index.getObjectField("key"); const string plugin = IndexNames::findPluginName(key); if ( IndexNames::existedBefore24(plugin) ) diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index b230121d533..96bf25f1c94 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -104,7 +104,7 @@ namespace mongo { unsigned long long n = 0; unsigned long long numDropped = 0; - auto_ptr<Runner> runner(InternalPlanner::collectionScan(txn,ns,collection)); + auto_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn,ns,collection)); std::string idxName = descriptor->indexName(); @@ -114,7 +114,7 @@ namespace mongo { BSONObj js; DiskLoc loc; - while (Runner::RUNNER_ADVANCED == runner->getNext(&js, &loc)) { + while (Runner::RUNNER_ADVANCED == exec->getNext(&js, &loc)) { try { if ( !dupsAllowed && dropDups ) { LastError::Disabled led( lastError.get() ); @@ -131,15 +131,15 @@ namespace mongo { // TODO: Does exception really imply dropDups exception? if (dropDups) { - bool runnerEOF = runner->isEOF(); - runner->saveState(); + bool execEOF = exec->isEOF(); + exec->saveState(); BSONObj toDelete; collection->deleteDocument( txn, loc, false, true, &toDelete ); repl::logOp(txn, "d", ns.c_str(), toDelete); - if (!runner->restoreState(txn)) { - // Runner got killed somehow. This probably shouldn't happen. - if (runnerEOF) { + if (!exec->restoreState(txn)) { + // PlanExecutor got killed somehow. This probably shouldn't happen. + if (execEOF) { // Quote: "We were already at the end. Normal. // TODO: Why is this normal? } diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 80fb8d0694e..ae6d16a2998 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -69,16 +69,16 @@ namespace mongo { return cursorStatsOpen.get(); } - ClientCursor::ClientCursor(const Collection* collection, Runner* runner, + ClientCursor::ClientCursor(const Collection* collection, PlanExecutor* exec, int qopts, const BSONObj query) : _collection( collection ), _countedYet( false ) { - _runner.reset(runner); - _ns = runner->ns(); + _exec.reset(exec); + _ns = exec->ns(); _query = query; _queryOptions = qopts; - if ( runner->collection() ) { - invariant( collection == runner->collection() ); + if ( exec->collection() ) { + invariant( collection == exec->collection() ); } init(); } @@ -141,8 +141,8 @@ namespace mongo { } void ClientCursor::kill() { - if ( _runner.get() ) - _runner->kill(); + if ( _exec.get() ) + _exec->kill(); _collection = NULL; } diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 252212a712a..1b0f323c49d 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -33,7 +33,7 @@ #include "mongo/db/diskloc.h" #include "mongo/db/jsobj.h" #include "mongo/db/keypattern.h" -#include "mongo/db/query/runner.h" +#include "mongo/db/query/plan_executor.h" #include "mongo/s/collection_metadata.h" #include "mongo/util/background.h" #include "mongo/util/net/message.h" @@ -57,7 +57,7 @@ namespace mongo { */ class ClientCursor : private boost::noncopyable { public: - ClientCursor(const Collection* collection, Runner* runner, + ClientCursor(const Collection* collection, PlanExecutor* exec, int qopts = 0, const BSONObj query = BSONObj()); ClientCursor(const Collection* collection); @@ -77,7 +77,7 @@ namespace mongo { * goes through killing cursors. * It removes the responsiilibty of de-registering from ClientCursor. * Responsibility for deleting the ClientCursor doesn't change from this call - * see Runner::kill. + * see PlanExecutor::kill. */ void kill(); @@ -115,10 +115,10 @@ namespace mongo { OpTime getSlaveReadTill() const { return _slaveReadTill; } // - // Query-specific functionality that may be adapted for the Runner. + // Query-specific functionality that may be adapted for the PlanExecutor. // - Runner* getRunner() const { return _runner.get(); } + PlanExecutor* getExecutor() const { return _exec.get(); } int queryOptions() const { return _queryOptions; } // Used by ops/query.cpp to stash how many results have been returned by a query. @@ -129,7 +129,7 @@ namespace mongo { /** * Is this ClientCursor backed by an aggregation pipeline. Defaults to false. * - * Agg Runners differ from others in that they manage their own locking internally and + * Agg executors differ from others in that they manage their own locking internally and * should not be killed or destroyed when the underlying collection is deleted. * * Note: This should *not* be set for the internal cursor used as input to an aggregation. @@ -200,7 +200,7 @@ namespace mongo { // // The underlying execution machinery. // - scoped_ptr<Runner> _runner; + scoped_ptr<PlanExecutor> _exec; }; /** diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp index d2dbf60e565..aaa1674a40a 100644 --- a/src/mongo/db/commands/collection_to_capped.cpp +++ b/src/mongo/db/commands/collection_to_capped.cpp @@ -83,24 +83,24 @@ namespace mongo { static_cast<long long>( fromCollection->dataSize() - ( toCollection->getRecordStore()->storageSize( txn ) * 2 ) ); - scoped_ptr<Runner> runner( InternalPlanner::collectionScan(txn, - fromNs, - fromCollection, - InternalPlanner::FORWARD ) ); + scoped_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, + fromNs, + fromCollection, + InternalPlanner::FORWARD ) ); while ( true ) { BSONObj obj; - Runner::RunnerState state = runner->getNext(&obj, NULL); + Runner::RunnerState state = exec->getNext(&obj, NULL); switch( state ) { case Runner::RUNNER_EOF: return Status::OK(); case Runner::RUNNER_DEAD: db->dropCollection( txn, toNs ); - return Status( ErrorCodes::InternalError, "runner turned dead while iterating" ); + return Status( ErrorCodes::InternalError, "executor turned dead while iterating" ); case Runner::RUNNER_ERROR: - return Status( ErrorCodes::InternalError, "runner error while iterating" ); + return Status( ErrorCodes::InternalError, "executor error while iterating" ); case Runner::RUNNER_ADVANCED: if ( excessSize > 0 ) { excessSize -= ( 4 * obj.objsize() ); // 4x is for padding, power of 2, etc... diff --git a/src/mongo/db/commands/count.cpp b/src/mongo/db/commands/count.cpp index f68f7488785..306af80f311 100644 --- a/src/mongo/db/commands/count.cpp +++ b/src/mongo/db/commands/count.cpp @@ -113,6 +113,8 @@ namespace mongo { } try { + ScopedExecutorRegistration safety(exec.get()); + long long count = 0; Runner::RunnerState state; while (Runner::RUNNER_ADVANCED == (state = exec->getNext(NULL, NULL))) { diff --git a/src/mongo/db/commands/dbhash.cpp b/src/mongo/db/commands/dbhash.cpp index 47405dd9888..2c5e5cee6f9 100644 --- a/src/mongo/db/commands/dbhash.cpp +++ b/src/mongo/db/commands/dbhash.cpp @@ -82,21 +82,21 @@ namespace mongo { IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex(); - auto_ptr<Runner> runner; + auto_ptr<PlanExecutor> exec; if ( desc ) { - runner.reset(InternalPlanner::indexScan(opCtx, - collection, - desc, - BSONObj(), - BSONObj(), - false, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH)); + exec.reset(InternalPlanner::indexScan(opCtx, + collection, + desc, + BSONObj(), + BSONObj(), + false, + InternalPlanner::FORWARD, + InternalPlanner::IXSCAN_FETCH)); } else if ( collection->isCapped() ) { - runner.reset(InternalPlanner::collectionScan(opCtx, - fullCollectionName, - collection)); + exec.reset(InternalPlanner::collectionScan(opCtx, + fullCollectionName, + collection)); } else { log() << "can't find _id index for: " << fullCollectionName << endl; @@ -109,8 +109,8 @@ namespace mongo { long long n = 0; Runner::RunnerState state; BSONObj c; - verify(NULL != runner.get()); - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&c, NULL))) { + verify(NULL != exec.get()); + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&c, NULL))) { md5_append( &st , (const md5_byte_t*)c.objdata() , c.objsize() ); n++; } diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index ce582933d7a..27769ffcfee 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -39,9 +39,9 @@ #include "mongo/db/commands.h" #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner_common.h" -#include "mongo/db/query/type_explain.h" +#include "mongo/db/query/explain.h" #include "mongo/util/timer.h" namespace mongo { @@ -98,10 +98,6 @@ namespace mongo { BSONArrayBuilder arr( bb ); BSONElementSet values; - long long nscanned = 0; // locations looked at - long long nscannedObjects = 0; // full objects looked at - long long n = 0; // matches - Client::ReadContext ctx(txn, ns); Collection* collection = ctx.ctx().db()->getCollection( txn, ns ); @@ -114,21 +110,20 @@ namespace mongo { return true; } - Runner* rawRunner; - Status status = getRunnerDistinct(txn, collection, query, key, &rawRunner); + PlanExecutor* rawExec; + Status status = getExecutorDistinct(txn, collection, query, key, &rawExec); if (!status.isOK()) { uasserted(17216, mongoutils::str::stream() << "Can't get runner for query " << query << ": " << status.toString()); return 0; } - auto_ptr<Runner> runner(rawRunner); - const ScopedRunnerRegistration safety(runner.get()); + auto_ptr<PlanExecutor> exec(rawExec); + const ScopedExecutorRegistration safety(exec.get()); - string cursorName; BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { // Distinct expands arrays. // // If our query is covered, each value of the key should be in the index key and @@ -150,17 +145,10 @@ namespace mongo { values.insert(x); } } - TypeExplain* bareExplain; - Status res = runner->getInfo(&bareExplain, NULL); - if (res.isOK()) { - auto_ptr<TypeExplain> explain(bareExplain); - if (explain->isCursorSet()) { - cursorName = explain->getCursor(); - } - n = explain->getN(); - nscanned = explain->getNScanned(); - nscannedObjects = explain->getNScannedObjects(); - } + + // Get summary information about the plan. + PlanSummaryStats stats; + Explain::getSummaryStats(exec.get(), &stats); verify( start == bb.buf() ); @@ -168,11 +156,11 @@ namespace mongo { { BSONObjBuilder b; - b.appendNumber( "n" , n ); - b.appendNumber( "nscanned" , nscanned ); - b.appendNumber( "nscannedObjects" , nscannedObjects ); + b.appendNumber( "n" , stats.nReturned ); + b.appendNumber( "nscanned" , stats.totalKeysExamined ); + b.appendNumber( "nscannedObjects" , stats.totalDocsExamined ); b.appendNumber( "timems" , t.millis() ); - b.append( "cursor" , cursorName ); + b.append( "planSummary" , stats.summaryStr ); result.append( "stats" , b.obj() ); } diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 9df4c363b62..ea90af2cb66 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -40,7 +40,7 @@ #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/operation_context_impl.h" #include "mongo/util/log.h" @@ -151,17 +151,18 @@ namespace mongo { massert(17383, "Could not canonicalize " + queryOriginal.toString(), CanonicalQuery::canonicalize(ns, queryOriginal, &cq, whereCallback).isOK()); - Runner* rawRunner; - massert(17384, "Could not get runner for query " + queryOriginal.toString(), - getRunner(txn, collection, cq, &rawRunner, QueryPlannerParams::DEFAULT).isOK()); + PlanExecutor* rawExec; + massert(17384, "Could not get plan executor for query " + queryOriginal.toString(), + getExecutor(txn, collection, cq, &rawExec, QueryPlannerParams::DEFAULT).isOK()); - auto_ptr<Runner> runner(rawRunner); + auto_ptr<PlanExecutor> exec(rawExec); - // Set up automatic yielding - const ScopedRunnerRegistration safety(runner.get()); + // We need to keep this PlanExecutor registration: we are concurrently modifying + // state and may continue doing that with document-level locking (approach is TBD). + const ScopedExecutorRegistration safety(exec.get()); Runner::RunnerState state; - if (Runner::RUNNER_ADVANCED == (state = runner->getNext(&doc, NULL))) { + if (Runner::RUNNER_ADVANCED == (state = exec->getNext(&doc, NULL))) { found = true; } } diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp index 7fb6b972efd..ddb89a4f48a 100644 --- a/src/mongo/db/commands/geo_near_cmd.cpp +++ b/src/mongo/db/commands/geo_near_cmd.cpp @@ -40,8 +40,8 @@ #include "mongo/db/index_names.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/jsobj.h" -#include "mongo/db/query/get_runner.h" -#include "mongo/db/query/type_explain.h" +#include "mongo/db/query/get_executor.h" +#include "mongo/db/query/explain.h" #include "mongo/db/catalog/collection.h" #include "mongo/platform/unordered_map.h" @@ -186,14 +186,14 @@ namespace mongo { return false; } - Runner* rawRunner; - if (!getRunner(txn, collection, cq, &rawRunner, 0).isOK()) { + PlanExecutor* rawExec; + if (!getExecutor(txn, collection, cq, &rawExec, 0).isOK()) { errmsg = "can't get query runner"; return false; } - auto_ptr<Runner> runner(rawRunner); - const ScopedRunnerRegistration safety(runner.get()); + auto_ptr<PlanExecutor> exec(rawExec); + const ScopedExecutorRegistration safety(exec.get()); double totalDistance = 0; BSONObjBuilder resultBuilder(result.subarrayStart("results")); @@ -201,7 +201,7 @@ namespace mongo { BSONObj currObj; int results = 0; - while ((results < numWanted) && Runner::RUNNER_ADVANCED == runner->getNext(&currObj, NULL)) { + while ((results < numWanted) && Runner::RUNNER_ADVANCED == exec->getNext(&currObj, NULL)) { // Come up with the correct distance. double dist = currObj["$dis"].number() * distanceMultiplier; @@ -246,13 +246,10 @@ namespace mongo { BSONObjBuilder stats(result.subobjStart("stats")); // Fill in nscanned from the explain. - TypeExplain* bareExplain; - Status res = runner->getInfo(&bareExplain, NULL); - if (res.isOK()) { - auto_ptr<TypeExplain> explain(bareExplain); - stats.append("nscanned", explain->getNScanned()); - stats.append("objectsLoaded", explain->getNScannedObjects()); - } + PlanSummaryStats summary; + Explain::getSummaryStats(exec.get(), &summary); + stats.appendNumber("nscanned", summary.totalKeysExamined); + stats.appendNumber("objectsLoaded", summary.totalDocsExamined); stats.append("avgDistance", totalDistance / results); stats.append("maxDistance", farthestDist); diff --git a/src/mongo/db/commands/group.cpp b/src/mongo/db/commands/group.cpp index aed8e699e83..9bfa3259a6f 100644 --- a/src/mongo/db/commands/group.cpp +++ b/src/mongo/db/commands/group.cpp @@ -41,7 +41,7 @@ #include "mongo/db/commands.h" #include "mongo/db/catalog/database.h" #include "mongo/db/instance.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/catalog/collection.h" #include "mongo/scripting/engine.h" @@ -146,18 +146,18 @@ namespace mongo { return 0; } - Runner* rawRunner; - if (!getRunner(txn,collection, cq, &rawRunner).isOK()) { - uasserted(17213, "Can't get runner for query " + query.toString()); + PlanExecutor* rawExec; + if (!getExecutor(txn,collection, cq, &rawExec).isOK()) { + uasserted(17213, "Can't get executor for query " + query.toString()); return 0; } - auto_ptr<Runner> runner(rawRunner); - const ScopedRunnerRegistration safety(runner.get()); + auto_ptr<PlanExecutor> exec(rawExec); + const ScopedExecutorRegistration safety(exec.get()); BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { BSONObj key = getKey(obj , keyPattern , keyFunction , keysize / keynum, s.get() ); keysize += key.objsize(); diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index df0a6e7b9e3..f2501f99e80 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -42,7 +42,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/instance.h" #include "mongo/db/matcher/matcher.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" @@ -987,17 +987,23 @@ namespace mongo { &cq, whereCallback).isOK()); - Runner* rawRunner; - verify(getRunner(_txn, ctx->ctx().db()->getCollection(_txn, _config.incLong), - cq, &rawRunner, QueryPlannerParams::NO_TABLE_SCAN).isOK()); + PlanExecutor* rawExec; + verify(getExecutor(_txn, ctx->ctx().db()->getCollection(_txn, _config.incLong), + cq, &rawExec, QueryPlannerParams::NO_TABLE_SCAN).isOK()); - auto_ptr<Runner> runner(rawRunner); - const ScopedRunnerRegistration safety(runner.get()); + auto_ptr<PlanExecutor> exec(rawExec); + + // This registration is necessary because we may manually yield the read lock + // below (in order to acquire a write lock and dump some data to a temporary + // collection). + // + // TODO: don't do this in the future. + const ScopedExecutorRegistration safety(exec.get()); // iterate over all sorted objects BSONObj o; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&o, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&o, NULL))) { pm.hit(); if ( o.woSortOrder( prev , sortKey ) == 0 ) { @@ -1009,7 +1015,7 @@ namespace mongo { continue; } - runner->saveState(); + exec->saveState(); ctx.reset(); @@ -1022,7 +1028,7 @@ namespace mongo { prev = o; all.push_back( o ); - if (!runner->restoreState(_txn)) { + if (!exec->restoreState(_txn)) { break; } @@ -1330,20 +1336,24 @@ namespace mongo { return 0; } - Runner* rawRunner; - if (!getRunner(txn, ctx->db()->getCollection(txn, config.ns), cq, &rawRunner).isOK()) { - uasserted(17239, "Can't get runner for query " + config.filter.toString()); + PlanExecutor* rawExec; + if (!getExecutor(txn, ctx->db()->getCollection(txn, config.ns), + cq, &rawExec).isOK()) { + uasserted(17239, "Can't get executor for query " + + config.filter.toString()); return 0; } - auto_ptr<Runner> runner(rawRunner); - const ScopedRunnerRegistration safety(runner.get()); + auto_ptr<PlanExecutor> exec(rawExec); + + // XXX: is this registration necessary? + const ScopedExecutorRegistration safety(exec.get()); Timer mt; // go through each doc BSONObj o; - while (Runner::RUNNER_ADVANCED == runner->getNext(&o, NULL)) { + while (Runner::RUNNER_ADVANCED == exec->getNext(&o, NULL)) { // check to see if this is a new object we don't own yet // because of a chunk migration if ( collMetadata ) { diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index c10cf053534..fbfda3e8138 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -33,6 +33,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/exec/plan_stage.h" #include "mongo/util/touch_pages.h" namespace mongo { @@ -49,58 +50,59 @@ namespace mongo { size_t size; }; - class MultiIteratorRunner : public Runner { + // XXX: move this to the exec/ directory. + class MultiIteratorStage : public PlanStage { public: - MultiIteratorRunner( const StringData& ns, Collection* collection ) - : _ns( ns.toString() ), - _collection( collection ) { - } - ~MultiIteratorRunner() { - } + MultiIteratorStage(WorkingSet* ws, Collection* collection) + : _collection(collection), + _ws(ws) { } + + ~MultiIteratorStage() { } // takes ownership of it void addIterator(RecordIterator* it) { _iterators.push_back(it); } - virtual RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut) { + virtual StageState work(WorkingSetID* out) { if ( _collection == NULL ) - return RUNNER_DEAD; + return PlanStage::DEAD; DiskLoc next = _advance(); if (next.isNull()) - return RUNNER_EOF; + return PlanStage::IS_EOF; - if ( objOut ) - *objOut = _collection->docFor( next ); - if ( dlOut ) - *dlOut = next; - return RUNNER_ADVANCED; + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->loc = next; + member->obj = _collection->docFor(next); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + return PlanStage::ADVANCED; } virtual bool isEOF() { return _collection == NULL || _iterators.empty(); } - virtual void kill() { + + void kill() { _collection = NULL; _iterators.clear(); } - virtual void saveState() { + + virtual void prepareToYield() { for (size_t i = 0; i < _iterators.size(); i++) { _iterators[i]->prepareToYield(); } } - virtual bool restoreState(OperationContext* opCtx) { + + virtual void recoverFromYield(OperationContext* opCtx) { for (size_t i = 0; i < _iterators.size(); i++) { if (!_iterators[i]->recoverFromYield()) { kill(); - return false; } } - return true; } - virtual const string& ns() { return _ns; } virtual void invalidate(const DiskLoc& dl, InvalidationType type) { switch ( type ) { case INVALIDATION_DELETION: @@ -113,12 +115,22 @@ namespace mongo { break; } } - virtual const Collection* collection() { - return _collection; - } - virtual Status getInfo(TypeExplain** explain, PlanInfo** planInfo) const { - return Status( ErrorCodes::InternalError, "no" ); + + // + // These should not be used. + // + + virtual PlanStageStats* getStats() { return NULL; } + virtual CommonStats* getCommonStats() { return NULL; } + virtual SpecificStats* getSpecificStats() { return NULL; } + + virtual std::vector<PlanStage*> getChildren() const { + vector<PlanStage*> empty; + return empty; } + + virtual StageType stageType() const { return STAGE_MULTI_ITERATOR; } + private: /** @@ -136,9 +148,11 @@ namespace mongo { return DiskLoc(); } - string _ns; Collection* _collection; OwnedPointerVector<RecordIterator> _iterators; + + // Not owned by us. + WorkingSet* _ws; }; // ------------------------------------------------ @@ -191,23 +205,28 @@ namespace mongo { numCursors = iterators.size(); } - OwnedPointerVector<MultiIteratorRunner> runners; + OwnedPointerVector<PlanExecutor> execs; for ( size_t i = 0; i < numCursors; i++ ) { - runners.push_back(new MultiIteratorRunner(ns.ns(), collection)); + WorkingSet* ws = new WorkingSet(); + MultiIteratorStage* mis = new MultiIteratorStage(ws, collection); + // Takes ownership of 'ws' and 'mis'. + execs.push_back(new PlanExecutor(ws, mis, collection)); } - // transfer iterators to runners using a round-robin distribution. + // transfer iterators to executors using a round-robin distribution. // TODO consider using a common work queue once invalidation issues go away. for (size_t i = 0; i < iterators.size(); i++) { - runners[i % runners.size()]->addIterator(iterators.releaseAt(i)); + PlanExecutor* theExec = execs[i % execs.size()]; + MultiIteratorStage* mis = static_cast<MultiIteratorStage*>(theExec->getStages()); + mis->addIterator(iterators.releaseAt(i)); } { BSONArrayBuilder bucketsBuilder; - for (size_t i = 0; i < runners.size(); i++) { - // transfer ownership of a runner to the ClientCursor (which manages its own + for (size_t i = 0; i < execs.size(); i++) { + // transfer ownership of an executor to the ClientCursor (which manages its own // lifetime). - ClientCursor* cc = new ClientCursor( collection, runners.releaseAt(i) ); + ClientCursor* cc = new ClientCursor( collection, execs.releaseAt(i) ); // we are mimicking the aggregation cursor output here // that is why there are ns, ok and empty firstBatch diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 5033dd05da7..de49bc61d1c 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -38,6 +38,7 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/commands.h" +#include "mongo/db/exec/plan_stage.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" @@ -46,7 +47,7 @@ #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/find_constants.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/storage_options.h" namespace mongo { @@ -54,33 +55,46 @@ namespace mongo { namespace { /** - * This is a Runner implementation backed by an aggregation pipeline. + * Stage for pulling results out from an aggregation pipeline. + * + * XXX: move this stage to the exec/ directory. */ - class PipelineRunner : public Runner { + class PipelineProxyStage : public PlanStage { public: - PipelineRunner(intrusive_ptr<Pipeline> pipeline, const boost::shared_ptr<Runner>& child) + PipelineProxyStage(intrusive_ptr<Pipeline> pipeline, + const boost::shared_ptr<PlanExecutor>& child, + WorkingSet* ws) : _pipeline(pipeline) , _includeMetaData(_pipeline->getContext()->inShard) // send metadata to merger - , _childRunner(child) + , _childExec(child) + , _ws(ws) {} - virtual RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut) { - if (!objOut || dlOut) - return RUNNER_ERROR; + virtual StageState work(WorkingSetID* out) { + if (!out) { + return PlanStage::FAILURE; + } if (!_stash.empty()) { - *objOut = _stash.back(); + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = _stash.back(); _stash.pop_back(); - return RUNNER_ADVANCED; + member->state = WorkingSetMember::OWNED_OBJ; + return PlanStage::ADVANCED; } if (boost::optional<BSONObj> next = getNextBson()) { - *objOut = *next; - return RUNNER_ADVANCED; + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = *next; + member->state = WorkingSetMember::OWNED_OBJ; + return PlanStage::ADVANCED; } - return RUNNER_EOF; + return PlanStage::IS_EOF; } + virtual bool isEOF() { if (!_stash.empty()) return false; @@ -92,41 +106,23 @@ namespace { return true; } - virtual const string& ns() { - return _pipeline->getContext()->ns.ns(); - } - - virtual Status getInfo(TypeExplain** explain, - PlanInfo** planInfo) const { - // This should never get called in practice anyway. - return Status(ErrorCodes::InternalError, - "PipelineCursor doesn't implement getExplainPlan"); - } - // propagate to child runner if still in use + // propagate to child executor if still in use virtual void invalidate(const DiskLoc& dl, InvalidationType type) { - if (boost::shared_ptr<Runner> runner = _childRunner.lock()) { - runner->invalidate(dl, type); - } - } - virtual void kill() { - if (boost::shared_ptr<Runner> runner = _childRunner.lock()) { - runner->kill(); + if (boost::shared_ptr<PlanExecutor> exec = _childExec.lock()) { + exec->invalidate(dl, type); } } - // Manage our OperationContext. We intentionally don't propagate to child Runner as that is - // handled by DocumentSourceCursor as it needs to. - virtual void saveState() { + // Manage our OperationContext. We intentionally don't propagate to the child + // Runner as that is handled by DocumentSourceCursor as it needs to. + virtual void prepareToYield() { _pipeline->getContext()->opCtx = NULL; } - virtual bool restoreState(OperationContext* opCtx) { + virtual void recoverFromYield(OperationContext* opCtx) { _pipeline->getContext()->opCtx = opCtx; - return true; } - virtual const Collection* collection() { return NULL; } - /** * Make obj the next object returned by getNext(). */ @@ -134,6 +130,23 @@ namespace { _stash.push_back(obj); } + // + // These should not be used. + // + + virtual PlanStageStats* getStats() { return NULL; } + virtual CommonStats* getCommonStats() { return NULL; } + virtual SpecificStats* getSpecificStats() { return NULL; } + + // Not used. + virtual std::vector<PlanStage*> getChildren() const { + vector<PlanStage*> empty; + return empty; + } + + // Not used. + virtual StageType stageType() const { return STAGE_PIPELINE_PROXY; } + private: boost::optional<BSONObj> getNextBson() { if (boost::optional<Document> next = _pipeline->output()->getNext()) { @@ -152,7 +165,10 @@ namespace { const intrusive_ptr<Pipeline> _pipeline; vector<BSONObj> _stash; const bool _includeMetaData; - boost::weak_ptr<Runner> _childRunner; + boost::weak_ptr<PlanExecutor> _childExec; + + // Not owned by us. + WorkingSet* _ws; }; } @@ -185,14 +201,14 @@ namespace { static void handleCursorCommand(OperationContext* txn, const string& ns, ClientCursorPin* pin, - PipelineRunner* runner, + PlanExecutor* exec, const BSONObj& cmdObj, BSONObjBuilder& result) { ClientCursor* cursor = pin ? pin->c() : NULL; if (pin) { invariant(cursor); - invariant(cursor->getRunner() == runner); + invariant(cursor->getExecutor() == exec); invariant(cursor->isAggCursor); } @@ -206,32 +222,34 @@ namespace { const int byteLimit = MaxBytesToReturnToClientAtOnce; BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineRunner may be very expensive so we don't + // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. - if (runner->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { + if (exec->getNext(&next, NULL) != Runner::RUNNER_ADVANCED) { if (pin) pin->deleteUnderlying(); - // make it an obvious error to use cursor or runner after this point + // make it an obvious error to use cursor or executor after this point cursor = NULL; - runner = NULL; + exec = NULL; break; } if (resultsArray.len() + next.objsize() > byteLimit) { + // Get the pipeline proxy stage wrapped by this PlanExecutor. + PipelineProxyStage* proxy = static_cast<PipelineProxyStage*>(exec->getStages()); // too big. next will be the first doc in the second batch - runner->pushBack(next); + proxy->pushBack(next); break; } resultsArray.append(next); } - // NOTE: runner->isEOF() can have side effects such as writing by $out. However, it should + // NOTE: exec->isEOF() can have side effects such as writing by $out. However, it should // be relatively quick since if there was no pin then the input is empty. Also, this // violates the contract for batchSize==0. Sharding requires a cursor to be returned in that // case. This is ok for now however, since you can't have a sharded collection that doesn't // exist. const bool canReturnMoreBatches = pin; - if (!canReturnMoreBatches && runner && !runner->isEOF()) { + if (!canReturnMoreBatches && exec && !exec->isEOF()) { // msgasserting since this shouldn't be possible to trigger from today's aggregation // language. The wording assumes that the only reason pin would be null is if the // collection doesn't exist. @@ -308,13 +326,13 @@ namespace { } #endif - PipelineRunner* runner = NULL; - scoped_ptr<ClientCursorPin> pin; // either this OR the runnerHolder will be non-null - auto_ptr<PipelineRunner> runnerHolder; + PlanExecutor* exec = NULL; + scoped_ptr<ClientCursorPin> pin; // either this OR the execHolder will be non-null + auto_ptr<PlanExecutor> execHolder; { // This will throw if the sharding version for this connection is out of date. The // lock must be held continuously from now until we have we created both the output - // ClientCursor and the input Runner. This ensures that both are using the same + // ClientCursor and the input executor. This ensures that both are using the same // sharding version that we synchronize on here. This is also why we always need to // create a ClientCursor even when we aren't outputting to a cursor. See the comment // on ShardFilterStage for more details. @@ -322,26 +340,37 @@ namespace { Collection* collection = ctx.ctx().db()->getCollection(txn, ns); - // This does mongod-specific stuff like creating the input Runner and adding to the - // front of the pipeline if needed. - boost::shared_ptr<Runner> input = PipelineD::prepareCursorSource(txn, - collection, - pPipeline, - pCtx); + // This does mongod-specific stuff like creating the input PlanExecutor and adding + // it to the front of the pipeline if needed. + boost::shared_ptr<PlanExecutor> input = PipelineD::prepareCursorSource(txn, + collection, + pPipeline, + pCtx); pPipeline->stitch(); - runnerHolder.reset(new PipelineRunner(pPipeline, input)); - runner = runnerHolder.get(); + // Create the PlanExecutor which returns results from the pipeline. The WorkingSet + // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created + // PlanExecutor. + auto_ptr<WorkingSet> ws(new WorkingSet()); + auto_ptr<PipelineProxyStage> proxy( + new PipelineProxyStage(pPipeline, input, ws.get())); + if (NULL == collection) { + execHolder.reset(new PlanExecutor(ws.release(), proxy.release(), ns)); + } + else { + execHolder.reset(new PlanExecutor(ws.release(), proxy.release(), collection)); + } + exec = execHolder.get(); if (!collection && input) { - // If we don't have a collection, we won't be able to register any Runners, so - // make sure that the input Runner (likely an EOFRunner) doesn't need to be - // registered. + // If we don't have a collection, we won't be able to register any executors, so + // make sure that the input PlanExecutor (likely wrapping an EOFStage) doesn't + // need to be registered. invariant(!input->collection()); } if (collection) { - ClientCursor* cursor = new ClientCursor(collection, runnerHolder.release()); + ClientCursor* cursor = new ClientCursor(collection, execHolder.release()); cursor->isAggCursor = true; // enable special locking behavior pin.reset(new ClientCursorPin(collection, cursor->cursorid())); // Don't add any code between here and the start of the try block. @@ -357,7 +386,7 @@ namespace { result << "stages" << Value(pPipeline->writeExplainOps()); } else if (isCursorCommand(cmdObj)) { - handleCursorCommand(txn, ns, pin.get(), runner, cmdObj, result); + handleCursorCommand(txn, ns, pin.get(), exec, cmdObj, result); keepCursor = true; } else { diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp index 9036445a88b..fb5f38c7897 100644 --- a/src/mongo/db/commands/test_commands.cpp +++ b/src/mongo/db/commands/test_commands.cpp @@ -150,14 +150,14 @@ namespace mongo { Collection* collection = ctx.ctx().db()->getCollection( txn, nss.ns() ); massert( 13417, "captrunc collection not found or empty", collection); - boost::scoped_ptr<Runner> runner(InternalPlanner::collectionScan(txn, - nss.ns(), - collection, - InternalPlanner::BACKWARD)); + boost::scoped_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(txn, nss.ns(), collection, + InternalPlanner::BACKWARD)); + DiskLoc end; // We remove 'n' elements so the start is one past that for( int i = 0; i < n + 1; ++i ) { - Runner::RunnerState state = runner->getNext(NULL, &end); + Runner::RunnerState state = exec->getNext(NULL, &end); massert( 13418, "captrunc invalid n", Runner::RUNNER_ADVANCED == state); } collection->temp_cappedTruncateAfter( txn, end, inc ); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 076b2fac59d..fad3e98d9da 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -365,10 +365,11 @@ namespace mongo { const string systemIndexes = ctx.db()->name() + ".system.indexes"; Collection* coll = ctx.db()->getCollection( &txn, systemIndexes ); - auto_ptr<Runner> runner(InternalPlanner::collectionScan(&txn, systemIndexes,coll)); + auto_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(&txn, systemIndexes,coll)); BSONObj index; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&index, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&index, NULL))) { const BSONObj key = index.getObjectField("key"); const string plugin = IndexNames::findPluginName(key); diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index e6c6d36ce41..147f86d9161 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -62,7 +62,7 @@ #include "mongo/db/lasterror.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/insert.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repair_database.h" @@ -673,23 +673,23 @@ namespace mongo { return 0; } - Runner* rawRunner; - if (!getRunner(txn, coll, cq, &rawRunner, QueryPlannerParams::NO_TABLE_SCAN).isOK()) { - uasserted(17241, "Can't get runner for query " + query.toString()); + PlanExecutor* rawExec; + if (!getExecutor(txn, coll, cq, &rawExec, QueryPlannerParams::NO_TABLE_SCAN).isOK()) { + uasserted(17241, "Can't get executor for query " + query.toString()); return 0; } - auto_ptr<Runner> runner(rawRunner); + auto_ptr<PlanExecutor> exec(rawExec); - // The runner must be registered to be informed of DiskLoc deletions and NS dropping + // The executor must be registered to be informed of DiskLoc deletions and NS dropping // when we yield the lock below. - const ScopedRunnerRegistration safety(runner.get()); + const ScopedExecutorRegistration safety(exec.get()); const ChunkVersion shardVersionAtStart = shardingState.getVersion(ns); BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { BSONElement ne = obj["n"]; verify(ne.isNumber()); int myn = ne.numberInt(); @@ -788,7 +788,7 @@ namespace mongo { result.appendBool( "estimate" , estimate ); - auto_ptr<Runner> runner; + auto_ptr<PlanExecutor> exec; if ( min.isEmpty() && max.isEmpty() ) { if ( estimate ) { result.appendNumber( "size" , static_cast<long long>(collection->dataSize()) ); @@ -797,7 +797,7 @@ namespace mongo { result.append( "millis" , timer.millis() ); return 1; } - runner.reset(InternalPlanner::collectionScan(txn, ns,collection)); + exec.reset(InternalPlanner::collectionScan(txn, ns,collection)); } else if ( min.isEmpty() || max.isEmpty() ) { errmsg = "only one of min or max specified"; @@ -822,7 +822,7 @@ namespace mongo { min = Helpers::toKeyFormat( kp.extendRangeBound( min, false ) ); max = Helpers::toKeyFormat( kp.extendRangeBound( max, false ) ); - runner.reset(InternalPlanner::indexScan(txn, collection, idx, min, max, false)); + exec.reset(InternalPlanner::indexScan(txn, collection, idx, min, max, false)); } long long avgObjSize = collection->dataSize() / collection->numRecords(); @@ -835,7 +835,7 @@ namespace mongo { DiskLoc loc; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &loc))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(NULL, &loc))) { if ( estimate ) size += avgObjSize; else diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index ae6b560dbad..db0132d5aa0 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -46,7 +46,7 @@ #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/update_result.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/oplog.h" @@ -112,15 +112,15 @@ namespace mongo { massert(17244, "Could not canonicalize " + query.toString(), CanonicalQuery::canonicalize(collection->ns(), query, &cq, whereCallback).isOK()); - Runner* rawRunner; + PlanExecutor* rawExec; size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT; - massert(17245, "Could not get runner for query " + query.toString(), - getRunner(txn, collection, cq, &rawRunner, options).isOK()); + massert(17245, "Could not get executor for query " + query.toString(), + getExecutor(txn, collection, cq, &rawExec, options).isOK()); - auto_ptr<Runner> runner(rawRunner); + auto_ptr<PlanExecutor> exec(rawExec); Runner::RunnerState state; DiskLoc loc; - if (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &loc))) { + if (Runner::RUNNER_ADVANCED == (state = exec->getNext(NULL, &loc))) { return loc; } return DiskLoc(); @@ -184,11 +184,10 @@ namespace mongo { */ bool Helpers::getSingleton(OperationContext* txn, const char *ns, BSONObj& result) { Client::Context context(txn, ns); - auto_ptr<Runner> runner(InternalPlanner::collectionScan(txn, - ns, - context.db()->getCollection(txn, - ns))); - Runner::RunnerState state = runner->getNext(&result, NULL); + auto_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(txn, ns, context.db()->getCollection(txn, ns))); + + Runner::RunnerState state = exec->getNext(&result, NULL); context.getClient()->curop()->done(); return Runner::RUNNER_ADVANCED == state; } @@ -196,11 +195,10 @@ namespace mongo { bool Helpers::getLast(OperationContext* txn, const char *ns, BSONObj& result) { Client::Context ctx(txn, ns); Collection* coll = ctx.db()->getCollection( txn, ns ); - auto_ptr<Runner> runner(InternalPlanner::collectionScan(txn, - ns, - coll, - InternalPlanner::BACKWARD)); - Runner::RunnerState state = runner->getNext(&result, NULL); + auto_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(txn, ns, coll, InternalPlanner::BACKWARD)); + + Runner::RunnerState state = exec->getNext(&result, NULL); return Runner::RUNNER_ADVANCED == state; } @@ -362,17 +360,18 @@ namespace mongo { IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern( indexKeyPattern.toBSON() ); - auto_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, desc, min, max, - maxInclusive, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH)); + auto_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, collection, desc, + min, max, + maxInclusive, + InternalPlanner::FORWARD, + InternalPlanner::IXSCAN_FETCH)); DiskLoc rloc; BSONObj obj; Runner::RunnerState state; // This may yield so we cannot touch nsd after this. - state = runner->getNext(&obj, &rloc); - runner.reset(); + state = exec->getNext(&obj, &rloc); + exec.reset(); if (Runner::RUNNER_EOF == state) { break; } if (Runner::RUNNER_DEAD == state) { @@ -520,13 +519,14 @@ namespace mongo { bool isLargeChunk = false; long long docCount = 0; - auto_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, idx, min, max, false)); + auto_ptr<PlanExecutor> exec( + InternalPlanner::indexScan(txn, collection, idx, min, max, false)); // we can afford to yield here because any change to the base data that we might miss is // already being queued and will be migrated in the 'transferMods' stage DiskLoc loc; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &loc))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(NULL, &loc))) { if ( !isLargeChunk ) { locs->insert( loc ); } diff --git a/src/mongo/db/exec/and_hash.cpp b/src/mongo/db/exec/and_hash.cpp index 4dc0b255576..4e43f55da82 100644 --- a/src/mongo/db/exec/and_hash.cpp +++ b/src/mongo/db/exec/and_hash.cpp @@ -440,11 +440,11 @@ namespace mongo { } } - void AndHashStage::recoverFromYield() { + void AndHashStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; for (size_t i = 0; i < _children.size(); ++i) { - _children[i]->recoverFromYield(); + _children[i]->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/and_hash.h b/src/mongo/db/exec/and_hash.h index fc2f24ad4fe..3ae34cca5a7 100644 --- a/src/mongo/db/exec/and_hash.h +++ b/src/mongo/db/exec/and_hash.h @@ -76,7 +76,7 @@ namespace mongo { virtual bool isEOF(); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/and_sorted.cpp b/src/mongo/db/exec/and_sorted.cpp index 58d17a2c557..42025074b70 100644 --- a/src/mongo/db/exec/and_sorted.cpp +++ b/src/mongo/db/exec/and_sorted.cpp @@ -265,11 +265,11 @@ namespace mongo { } } - void AndSortedStage::recoverFromYield() { + void AndSortedStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; for (size_t i = 0; i < _children.size(); ++i) { - _children[i]->recoverFromYield(); + _children[i]->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/and_sorted.h b/src/mongo/db/exec/and_sorted.h index 13c8a663426..f3660b17881 100644 --- a/src/mongo/db/exec/and_sorted.h +++ b/src/mongo/db/exec/and_sorted.h @@ -62,7 +62,7 @@ namespace mongo { virtual bool isEOF(); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp index f93935fb224..488daf003f3 100644 --- a/src/mongo/db/exec/cached_plan.cpp +++ b/src/mongo/db/exec/cached_plan.cpp @@ -104,13 +104,13 @@ namespace mongo { ++_commonStats.yields; } - void CachedPlanStage::recoverFromYield() { + void CachedPlanStage::recoverFromYield(OperationContext* opCtx) { if (NULL != _backupChildPlan.get()) { - _backupChildPlan->recoverFromYield(); + _backupChildPlan->recoverFromYield(opCtx); } if (! _usingBackupChild) { - _mainChildPlan->recoverFromYield(); + _mainChildPlan->recoverFromYield(opCtx); } ++_commonStats.unyields; } diff --git a/src/mongo/db/exec/cached_plan.h b/src/mongo/db/exec/cached_plan.h index 7049f6e35f1..8041e5f3e26 100644 --- a/src/mongo/db/exec/cached_plan.h +++ b/src/mongo/db/exec/cached_plan.h @@ -57,7 +57,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index c424cc6ba6d..a939aee0b84 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -151,7 +151,7 @@ namespace mongo { } } - void CollectionScan::recoverFromYield() { + void CollectionScan::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; if (NULL != _iter) { if (!_iter->recoverFromYield()) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index a6cfa4eb541..f70dc5facbf 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -57,7 +57,7 @@ namespace mongo { virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/count.cpp b/src/mongo/db/exec/count.cpp index 08cd7869baa..092f3bc1006 100644 --- a/src/mongo/db/exec/count.cpp +++ b/src/mongo/db/exec/count.cpp @@ -152,7 +152,7 @@ namespace mongo { _endCursor->savePosition(); } - void Count::recoverFromYield() { + void Count::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; if (_hitEnd || (NULL == _btreeCursor.get())) { return; } diff --git a/src/mongo/db/exec/count.h b/src/mongo/db/exec/count.h index 11e477f8822..60873374cdc 100644 --- a/src/mongo/db/exec/count.h +++ b/src/mongo/db/exec/count.h @@ -63,7 +63,7 @@ namespace mongo { * any WorkingSetMember(s) for any of the data, instead returning ADVANCED to indicate to the * caller that another result should be counted. * - * Only created through the getRunnerCount path, as count is the only operation that doesn't + * Only created through the getExecutorCount path, as count is the only operation that doesn't * care about its data. */ class Count : public PlanStage { @@ -74,7 +74,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual bool isEOF(); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/distinct_scan.cpp b/src/mongo/db/exec/distinct_scan.cpp index 7c6e8b1285d..a3d0fe57994 100644 --- a/src/mongo/db/exec/distinct_scan.cpp +++ b/src/mongo/db/exec/distinct_scan.cpp @@ -158,7 +158,7 @@ namespace mongo { _btreeCursor->savePosition(); } - void DistinctScan::recoverFromYield() { + void DistinctScan::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; if (_hitEnd || (NULL == _btreeCursor.get())) { return; } diff --git a/src/mongo/db/exec/distinct_scan.h b/src/mongo/db/exec/distinct_scan.h index 453cb37efac..15935c83cb5 100644 --- a/src/mongo/db/exec/distinct_scan.h +++ b/src/mongo/db/exec/distinct_scan.h @@ -85,7 +85,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual bool isEOF(); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/eof.cpp b/src/mongo/db/exec/eof.cpp index 076e5426028..c160f2a5ef7 100644 --- a/src/mongo/db/exec/eof.cpp +++ b/src/mongo/db/exec/eof.cpp @@ -54,7 +54,7 @@ namespace mongo { ++_commonStats.yields; } - void EOFStage::recoverFromYield() { + void EOFStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; } diff --git a/src/mongo/db/exec/eof.h b/src/mongo/db/exec/eof.h index c7050716f3b..5475fa9dbe7 100644 --- a/src/mongo/db/exec/eof.h +++ b/src/mongo/db/exec/eof.h @@ -46,7 +46,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp index 7efade26230..e13cf3857f7 100644 --- a/src/mongo/db/exec/fetch.cpp +++ b/src/mongo/db/exec/fetch.cpp @@ -116,9 +116,9 @@ namespace mongo { _child->prepareToYield(); } - void FetchStage::recoverFromYield() { + void FetchStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void FetchStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h index 7e4c807c1b5..7933fc583ab 100644 --- a/src/mongo/db/exec/fetch.h +++ b/src/mongo/db/exec/fetch.h @@ -56,7 +56,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index 310d08f0a04..5d15b2d7418 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -32,6 +32,7 @@ #include "mongo/client/dbclientinterface.h" #include "mongo/db/exec/projection.h" +#include "mongo/db/exec/working_set_computed_data.h" #include "mongo/db/index/btree_access_method.h" #include "mongo/s/d_logic.h" @@ -48,7 +49,14 @@ namespace mongo { _key(query->getQueryObj()["_id"].wrap()), _killed(false), _done(false), - _commonStats(kStageType) { } + _commonStats(kStageType) { + if (NULL != query->getProj()) { + _addKeyMetadata = query->getProj()->wantIndexKey(); + } + else { + _addKeyMetadata = false; + } + } IDHackStage::IDHackStage(OperationContext* txn, Collection* collection, const BSONObj& key, WorkingSet* ws) @@ -58,6 +66,7 @@ namespace mongo { _key(key), _killed(false), _done(false), + _addKeyMetadata(false), _commonStats(kStageType) { } IDHackStage::~IDHackStage() { } @@ -108,6 +117,13 @@ namespace mongo { member->obj = _collection->docFor(loc); member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + if (_addKeyMetadata) { + BSONObjBuilder bob; + BSONObj ownedKeyObj = member->obj["_id"].wrap().getOwned(); + bob.appendKeys(_key, ownedKeyObj); + member->addComputed(new IndexKeyComputedData(bob.obj())); + } + _done = true; ++_commonStats.advanced; *out = id; @@ -118,7 +134,7 @@ namespace mongo { ++_commonStats.yields; } - void IDHackStage::recoverFromYield() { + void IDHackStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; } diff --git a/src/mongo/db/exec/idhack.h b/src/mongo/db/exec/idhack.h index b365374d893..a708a332c27 100644 --- a/src/mongo/db/exec/idhack.h +++ b/src/mongo/db/exec/idhack.h @@ -54,7 +54,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); /** @@ -93,6 +93,9 @@ namespace mongo { // Have we returned our one document? bool _done; + // Do we need to add index key metadata for $returnKey? + bool _addKeyMetadata; + CommonStats _commonStats; IDHackStats _specificStats; }; diff --git a/src/mongo/db/exec/index_scan.cpp b/src/mongo/db/exec/index_scan.cpp index 5e0bd22b899..0be67cda714 100644 --- a/src/mongo/db/exec/index_scan.cpp +++ b/src/mongo/db/exec/index_scan.cpp @@ -248,7 +248,7 @@ namespace mongo { _indexCursor->savePosition(); } - void IndexScan::recoverFromYield() { + void IndexScan::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; if (_hitEnd || (NULL == _indexCursor.get())) { return; } diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h index 50e2336a9f9..40ad7a7315c 100644 --- a/src/mongo/db/exec/index_scan.h +++ b/src/mongo/db/exec/index_scan.h @@ -90,7 +90,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual bool isEOF(); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/keep_mutations.cpp b/src/mongo/db/exec/keep_mutations.cpp index 7d452852317..af76962a383 100644 --- a/src/mongo/db/exec/keep_mutations.cpp +++ b/src/mongo/db/exec/keep_mutations.cpp @@ -108,9 +108,9 @@ namespace mongo { _child->prepareToYield(); } - void KeepMutationsStage::recoverFromYield() { + void KeepMutationsStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void KeepMutationsStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/keep_mutations.h b/src/mongo/db/exec/keep_mutations.h index 1a41cef0fb1..fc3bfbd4323 100644 --- a/src/mongo/db/exec/keep_mutations.h +++ b/src/mongo/db/exec/keep_mutations.h @@ -52,7 +52,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/limit.cpp b/src/mongo/db/exec/limit.cpp index bc2c24649c5..d13a378fcb3 100644 --- a/src/mongo/db/exec/limit.cpp +++ b/src/mongo/db/exec/limit.cpp @@ -88,9 +88,9 @@ namespace mongo { _child->prepareToYield(); } - void LimitStage::recoverFromYield() { + void LimitStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void LimitStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/limit.h b/src/mongo/db/exec/limit.h index 9e1ffdc2e85..8a14d4ddea9 100644 --- a/src/mongo/db/exec/limit.h +++ b/src/mongo/db/exec/limit.h @@ -50,7 +50,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/merge_sort.cpp b/src/mongo/db/exec/merge_sort.cpp index b40d2db66a3..c9f449caa53 100644 --- a/src/mongo/db/exec/merge_sort.cpp +++ b/src/mongo/db/exec/merge_sort.cpp @@ -189,10 +189,10 @@ namespace mongo { } } - void MergeSortStage::recoverFromYield() { + void MergeSortStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; for (size_t i = 0; i < _children.size(); ++i) { - _children[i]->recoverFromYield(); + _children[i]->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/merge_sort.h b/src/mongo/db/exec/merge_sort.h index bc3852ab16c..1475f811793 100644 --- a/src/mongo/db/exec/merge_sort.h +++ b/src/mongo/db/exec/merge_sort.h @@ -66,7 +66,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/mock_stage.h b/src/mongo/db/exec/mock_stage.h index c5241a669d3..4742cc3aa7a 100644 --- a/src/mongo/db/exec/mock_stage.h +++ b/src/mongo/db/exec/mock_stage.h @@ -59,7 +59,7 @@ namespace mongo { // Some day we could count the # of calls to the yield functions to check that other stages // have correct yielding behavior. virtual void prepareToYield() { } - virtual void recoverFromYield() { } + virtual void recoverFromYield(OperationContext* opCtx) { } virtual void invalidate(const DiskLoc& dl, InvalidationType type) { } virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp index c3749acaa48..c2c394cc21c 100644 --- a/src/mongo/db/exec/multi_plan.cpp +++ b/src/mongo/db/exec/multi_plan.cpp @@ -418,7 +418,7 @@ namespace mongo { } } - void MultiPlanStage::recoverFromYield() { + void MultiPlanStage::recoverFromYield(OperationContext* opCtx) { if (_failure) return; // this logic is from multi_plan_runner @@ -426,13 +426,13 @@ namespace mongo { // the _bestPlan if we've switched to the backup? if (bestPlanChosen()) { - _candidates[_bestPlanIdx].root->recoverFromYield(); + _candidates[_bestPlanIdx].root->recoverFromYield(opCtx); if (hasBackupPlan()) { - _candidates[_backupPlanIdx].root->recoverFromYield(); + _candidates[_backupPlanIdx].root->recoverFromYield(opCtx); } } else { - allPlansRestoreState(); + allPlansRestoreState(opCtx); } } @@ -506,9 +506,9 @@ namespace mongo { } } - void MultiPlanStage::allPlansRestoreState() { + void MultiPlanStage::allPlansRestoreState(OperationContext* opCtx) { for (size_t i = 0; i < _candidates.size(); ++i) { - _candidates[i].root->recoverFromYield(); + _candidates[i].root->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/multi_plan.h b/src/mongo/db/exec/multi_plan.h index d709652f6ee..bb43873d843 100644 --- a/src/mongo/db/exec/multi_plan.h +++ b/src/mongo/db/exec/multi_plan.h @@ -64,7 +64,7 @@ namespace mongo { virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); @@ -140,7 +140,7 @@ namespace mongo { void allPlansSaveState(); - void allPlansRestoreState(); + void allPlansRestoreState(OperationContext* opCtx); static const int kNoSuchPlan = -1; diff --git a/src/mongo/db/exec/near.cpp b/src/mongo/db/exec/near.cpp index 370f7a3d0d0..32cfa6e7464 100644 --- a/src/mongo/db/exec/near.cpp +++ b/src/mongo/db/exec/near.cpp @@ -288,10 +288,10 @@ namespace mongo { } } - void NearStage::recoverFromYield() { + void NearStage::recoverFromYield(OperationContext* opCtx) { ++_stats->common.unyields; if (_nextInterval) { - _nextInterval->covering->recoverFromYield(); + _nextInterval->covering->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/near.h b/src/mongo/db/exec/near.h index 64cbfe2fd1c..681d86f32a4 100644 --- a/src/mongo/db/exec/near.h +++ b/src/mongo/db/exec/near.h @@ -90,7 +90,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp index 93d4028ed10..e7624b5668b 100644 --- a/src/mongo/db/exec/oplogstart.cpp +++ b/src/mongo/db/exec/oplogstart.cpp @@ -164,9 +164,9 @@ namespace mongo { } } - void OplogStart::recoverFromYield() { + void OplogStart::recoverFromYield(OperationContext* opCtx) { if (_cs) { - _cs->recoverFromYield(); + _cs->recoverFromYield(opCtx); } for (size_t i = 0; i < _subIterators.size(); i++) { diff --git a/src/mongo/db/exec/oplogstart.h b/src/mongo/db/exec/oplogstart.h index 3fc96ac9736..7b0b8d17ecf 100644 --- a/src/mongo/db/exec/oplogstart.h +++ b/src/mongo/db/exec/oplogstart.h @@ -71,7 +71,7 @@ namespace mongo { virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/or.cpp b/src/mongo/db/exec/or.cpp index 2bd97c81389..c82e75a2237 100644 --- a/src/mongo/db/exec/or.cpp +++ b/src/mongo/db/exec/or.cpp @@ -144,10 +144,10 @@ namespace mongo { } } - void OrStage::recoverFromYield() { + void OrStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; for (size_t i = 0; i < _children.size(); ++i) { - _children[i]->recoverFromYield(); + _children[i]->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/or.h b/src/mongo/db/exec/or.h index 53f928dc622..22ce3660dc0 100644 --- a/src/mongo/db/exec/or.h +++ b/src/mongo/db/exec/or.h @@ -55,7 +55,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h index 4187334eb87..4c8f27d3a08 100644 --- a/src/mongo/db/exec/plan_stage.h +++ b/src/mongo/db/exec/plan_stage.h @@ -36,6 +36,7 @@ namespace mongo { class Collection; class DiskLoc; + class OperationContext; /** * A PlanStage ("stage") is the basic building block of a "Query Execution Plan." A stage is @@ -176,6 +177,8 @@ namespace mongo { /** * Notifies the stage that all locks are about to be released. The stage must save any * state required to resume where it was before prepareToYield was called. + * + * XXX: rename to saveState() */ virtual void prepareToYield() = 0; @@ -184,8 +187,13 @@ namespace mongo { * any saved state and be ready to handle calls to work(). * * Can only be called after prepareToYield. + * + * XXX: rename to restoreState() + * + * XXX: We may not need to pass down 'opCtx' if getMore'd queries use the same + * OperationContext they were created with. */ - virtual void recoverFromYield() = 0; + virtual void recoverFromYield(OperationContext* opCtx) = 0; /** * Notifies a stage that a DiskLoc is going to be deleted (or in-place updated) so that the diff --git a/src/mongo/db/exec/projection.cpp b/src/mongo/db/exec/projection.cpp index ecbb03704e2..b21bc14a985 100644 --- a/src/mongo/db/exec/projection.cpp +++ b/src/mongo/db/exec/projection.cpp @@ -240,9 +240,9 @@ namespace mongo { _child->prepareToYield(); } - void ProjectionStage::recoverFromYield() { + void ProjectionStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void ProjectionStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/projection.h b/src/mongo/db/exec/projection.h index 1e79af2bf44..03bb1839102 100644 --- a/src/mongo/db/exec/projection.h +++ b/src/mongo/db/exec/projection.h @@ -84,7 +84,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/shard_filter.cpp b/src/mongo/db/exec/shard_filter.cpp index b427a1d6373..831421aacd2 100644 --- a/src/mongo/db/exec/shard_filter.cpp +++ b/src/mongo/db/exec/shard_filter.cpp @@ -89,9 +89,9 @@ namespace mongo { _child->prepareToYield(); } - void ShardFilterStage::recoverFromYield() { + void ShardFilterStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void ShardFilterStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/shard_filter.h b/src/mongo/db/exec/shard_filter.h index 31f23bee535..f2374b5ddd7 100644 --- a/src/mongo/db/exec/shard_filter.h +++ b/src/mongo/db/exec/shard_filter.h @@ -81,7 +81,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/skip.cpp b/src/mongo/db/exec/skip.cpp index d03d39d9f06..e7e7b1710d1 100644 --- a/src/mongo/db/exec/skip.cpp +++ b/src/mongo/db/exec/skip.cpp @@ -92,9 +92,9 @@ namespace mongo { _child->prepareToYield(); } - void SkipStage::recoverFromYield() { + void SkipStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void SkipStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/skip.h b/src/mongo/db/exec/skip.h index 6ce6ce03ca8..fc8814e2699 100644 --- a/src/mongo/db/exec/skip.h +++ b/src/mongo/db/exec/skip.h @@ -49,7 +49,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/sort.cpp b/src/mongo/db/exec/sort.cpp index d7075cfd09a..2816b9e4815 100644 --- a/src/mongo/db/exec/sort.cpp +++ b/src/mongo/db/exec/sort.cpp @@ -420,9 +420,9 @@ namespace mongo { _child->prepareToYield(); } - void SortStage::recoverFromYield() { + void SortStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } void SortStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/sort.h b/src/mongo/db/exec/sort.h index efd0f2fcccc..119f4f59019 100644 --- a/src/mongo/db/exec/sort.h +++ b/src/mongo/db/exec/sort.h @@ -150,7 +150,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp index 4019aae418b..7d2ca07d682 100644 --- a/src/mongo/db/exec/subplan.cpp +++ b/src/mongo/db/exec/subplan.cpp @@ -395,7 +395,16 @@ namespace mongo { if (isEOF()) { return PlanStage::IS_EOF; } invariant(_child.get()); - return _child->work(out); + StageState state = _child->work(out); + + if (PlanStage::NEED_TIME == state) { + ++_commonStats.needTime; + } + else if (PlanStage::ADVANCED == state) { + ++_commonStats.advanced; + } + + return state; } void SubplanStage::prepareToYield() { @@ -411,7 +420,7 @@ namespace mongo { } } - void SubplanStage::recoverFromYield() { + void SubplanStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; if (_killed) { return; @@ -420,7 +429,7 @@ namespace mongo { // We're ranking a sub-plan via an MPR or we're streaming results from this stage. Either // way, pass on the request. if (NULL != _child.get()) { - _child->recoverFromYield(); + _child->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/subplan.h b/src/mongo/db/exec/subplan.h index 5655e429901..3ff3da921c4 100644 --- a/src/mongo/db/exec/subplan.h +++ b/src/mongo/db/exec/subplan.h @@ -72,7 +72,7 @@ namespace mongo { virtual StageState work(WorkingSetID* out); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/exec/text.cpp b/src/mongo/db/exec/text.cpp index 3d1cb7560a5..17fa93eff68 100644 --- a/src/mongo/db/exec/text.cpp +++ b/src/mongo/db/exec/text.cpp @@ -112,11 +112,11 @@ namespace mongo { } } - void TextStage::recoverFromYield() { + void TextStage::recoverFromYield(OperationContext* opCtx) { ++_commonStats.unyields; for (size_t i = 0; i < _scanners.size(); ++i) { - _scanners.mutableVector()[i]->recoverFromYield(); + _scanners.mutableVector()[i]->recoverFromYield(opCtx); } } diff --git a/src/mongo/db/exec/text.h b/src/mongo/db/exec/text.h index ffaaa7b2971..288c6e3d23d 100644 --- a/src/mongo/db/exec/text.h +++ b/src/mongo/db/exec/text.h @@ -106,7 +106,7 @@ namespace mongo { virtual bool isEOF(); virtual void prepareToYield(); - virtual void recoverFromYield(); + virtual void recoverFromYield(OperationContext* opCtx); virtual void invalidate(const DiskLoc& dl, InvalidationType type); virtual std::vector<PlanStage*> getChildren() const; diff --git a/src/mongo/db/fts/fts_command_mongod.cpp b/src/mongo/db/fts/fts_command_mongod.cpp index bebfa07cdfc..146d0159faf 100644 --- a/src/mongo/db/fts/fts_command_mongod.cpp +++ b/src/mongo/db/fts/fts_command_mongod.cpp @@ -36,8 +36,8 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/fts/fts_command.h" #include "mongo/db/fts/fts_util.h" -#include "mongo/db/query/get_runner.h" -#include "mongo/db/query/type_explain.h" +#include "mongo/db/query/get_executor.h" +#include "mongo/db/query/explain.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/timer.h" @@ -112,14 +112,15 @@ namespace mongo { return false; } - Runner* rawRunner; - Status getRunnerStatus = getRunner(txn, ctx.ctx().db()->getCollection(txn, ns), cq, &rawRunner); - if (!getRunnerStatus.isOK()) { - errmsg = getRunnerStatus.reason(); + PlanExecutor* rawExec; + Status getExecStatus = getExecutor( + txn, ctx.ctx().db()->getCollection(txn, ns), cq, &rawExec); + if (!getExecStatus.isOK()) { + errmsg = getExecStatus.reason(); return false; } - auto_ptr<Runner> runner(rawRunner); + auto_ptr<PlanExecutor> exec(rawExec); BSONArrayBuilder resultBuilder(result.subarrayStart("results")); @@ -129,7 +130,7 @@ namespace mongo { int numReturned = 0; BSONObj obj; - while (Runner::RUNNER_ADVANCED == runner->getNext(&obj, NULL)) { + while (Runner::RUNNER_ADVANCED == exec->getNext(&obj, NULL)) { if ((resultSize + obj.objsize()) >= BSONObjMaxUserSize) { break; } @@ -158,13 +159,10 @@ namespace mongo { BSONObjBuilder stats(result.subobjStart("stats")); // Fill in nscanned from the explain. - TypeExplain* bareExplain; - Status res = runner->getInfo(&bareExplain, NULL); - if (res.isOK()) { - auto_ptr<TypeExplain> explain(bareExplain); - stats.append("nscanned", explain->getNScanned()); - stats.append("nscannedObjects", explain->getNScannedObjects()); - } + PlanSummaryStats summary; + Explain::getSummaryStats(exec.get(), &summary); + stats.appendNumber("nscanned", summary.totalKeysExamined); + stats.appendNumber("nscannedObjects", summary.totalDocsExamined); stats.appendNumber( "n" , numReturned ); stats.append( "timeMicros", (int)comm.micros() ); diff --git a/src/mongo/db/index/haystack_access_method.cpp b/src/mongo/db/index/haystack_access_method.cpp index f849e708d5b..8d2e72a474f 100644 --- a/src/mongo/db/index/haystack_access_method.cpp +++ b/src/mongo/db/index/haystack_access_method.cpp @@ -100,11 +100,11 @@ namespace mongo { unordered_set<DiskLoc, DiskLoc::Hasher> thisPass; - scoped_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, + scoped_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, collection, _descriptor, key, key, true)); Runner::RunnerState state; DiskLoc loc; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &loc))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(NULL, &loc))) { if (hopper.limitReached()) { break; } pair<unordered_set<DiskLoc, DiskLoc::Hasher>::iterator, bool> p = thisPass.insert(loc); diff --git a/src/mongo/db/ops/delete_executor.cpp b/src/mongo/db/ops/delete_executor.cpp index d2c36f3ba3c..17b2123c8f1 100644 --- a/src/mongo/db/ops/delete_executor.cpp +++ b/src/mongo/db/ops/delete_executor.cpp @@ -36,7 +36,7 @@ #include "mongo/db/curop.h" #include "mongo/db/ops/delete_request.h" #include "mongo/db/query/canonical_query.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/db/repl/repl_coordinator_global.h" @@ -117,31 +117,31 @@ namespace mongo { long long nDeleted = 0; - Runner* rawRunner; + PlanExecutor* rawExec; if (_canonicalQuery.get()) { - uassertStatusOK(getRunner(_request->getOpCtx(), - collection, - _canonicalQuery.release(), - &rawRunner)); + uassertStatusOK(getExecutor(_request->getOpCtx(), + collection, + _canonicalQuery.release(), + &rawExec)); } else { - CanonicalQuery* ignored; - uassertStatusOK(getRunner(_request->getOpCtx(), - collection, - ns.ns(), - _request->getQuery(), - &rawRunner, - &ignored)); + uassertStatusOK(getExecutor(_request->getOpCtx(), + collection, + ns.ns(), + _request->getQuery(), + &rawExec)); } - auto_ptr<Runner> runner(rawRunner); - ScopedRunnerRegistration safety(runner.get()); + auto_ptr<PlanExecutor> exec(rawExec); + + // Concurrently mutating state (by us) so we need to register 'exec'. + ScopedExecutorRegistration safety(exec.get()); DiskLoc rloc; Runner::RunnerState state; CurOp* curOp = _request->getOpCtx()->getCurOp(); int oldYieldCount = curOp->numYields(); - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &rloc))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(NULL, &rloc))) { if (oldYieldCount != curOp->numYields()) { uassert(ErrorCodes::NotMaster, str::stream() << "No longer primary while removing from " << ns.ns(), @@ -154,10 +154,10 @@ namespace mongo { // TODO: do we want to buffer docs and delete them in a group rather than // saving/restoring state repeatedly? - runner->saveState(); + exec->saveState(); collection->deleteDocument( _request->getOpCtx(), rloc, false, false, logop ? &toDelete : NULL); - runner->restoreState(_request->getOpCtx()); + exec->restoreState(_request->getOpCtx()); nDeleted++; diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index f872f39a97d..4ca1367465c 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -466,6 +466,9 @@ namespace mongo { // Create the plan executor and setup all deps. auto_ptr<PlanExecutor> exec(rawExec); + // Register executor with the collection cursor cache. + const ScopedExecutorRegistration safety(exec.get()); + // Get the canonical query which the underlying executor is using. This may be NULL in // the case of idhack updates. cq = exec->getCanonicalQuery(); @@ -667,7 +670,7 @@ namespace mongo { // Restore state after modification uassert(17278, "Update could not restore plan executor state after updating a document.", - exec->restoreState()); + exec->restoreState(request.getOpCtx())); // Call logOp if requested. if (request.shouldCallLogOp() && !logObj.isEmpty()) { diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index ca3e93a9ce4..5c34eae6fd5 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -55,7 +55,7 @@ namespace mongo { class ExpressionFieldPath; class ExpressionObject; class DocumentSourceLimit; - class Runner; + class PlanExecutor; class DocumentSource : public IntrusiveCounterUnsigned { public: @@ -334,7 +334,9 @@ namespace mongo { /** - * Constructs and returns Documents from the BSONObj objects produced by a supplied Runner. + * Constructs and returns Documents from the BSONObj objects produced by a supplied + * PlanExecutor. + * * An object of this type may only be used by one thread, see SERVER-6123. */ class DocumentSourceCursor : @@ -351,14 +353,14 @@ namespace mongo { virtual void dispose(); /** - * Create a document source based on a passed-in Runner. + * Create a document source based on a passed-in PlanExecutor. * * This is usually put at the beginning of a chain of document sources * in order to fetch data from the database. */ static intrusive_ptr<DocumentSourceCursor> create( const std::string& ns, - const boost::shared_ptr<Runner>& runner, + const boost::shared_ptr<PlanExecutor>& exec, const intrusive_ptr<ExpressionContext> &pExpCtx); /* @@ -402,7 +404,7 @@ namespace mongo { private: DocumentSourceCursor( const std::string& ns, - const boost::shared_ptr<Runner>& runner, + const boost::shared_ptr<PlanExecutor>& exec, const intrusive_ptr<ExpressionContext> &pExpCtx); void loadBatch(); @@ -418,7 +420,7 @@ namespace mongo { long long _docsAddedToBatches; // for _limit enforcement const std::string _ns; - boost::shared_ptr<Runner> _runner; // PipelineRunner holds a weak_ptr to this. + boost::shared_ptr<PlanExecutor> _exec; // PipelineRunner holds a weak_ptr to this. }; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 6c4d8c5d6c0..b037e9b7f06 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -33,6 +33,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/instance.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/query/explain.h" #include "mongo/db/query/find_constants.h" #include "mongo/db/query/type_explain.h" #include "mongo/db/storage_options.h" @@ -65,29 +66,29 @@ namespace mongo { } void DocumentSourceCursor::dispose() { - // Can't call in to Runner or ClientCursor registries from this function since it will be - // called when an agg cursor is killed which would cause a deadlock. - _runner.reset(); + // Can't call in to PlanExecutor or ClientCursor registries from this function since it + // will be called when an agg cursor is killed which would cause a deadlock. + _exec.reset(); _currentBatch.clear(); } void DocumentSourceCursor::loadBatch() { - if (!_runner) { + if (!_exec) { dispose(); return; } - // We have already validated the sharding version when we constructed the Runner + // We have already validated the sharding version when we constructed the PlanExecutor // so we shouldn't check it again. Lock::DBRead lk(pExpCtx->opCtx->lockState(), _ns); Client::Context ctx(pExpCtx->opCtx, _ns, /*doVersion=*/false); - _runner->restoreState(pExpCtx->opCtx); + _exec->restoreState(pExpCtx->opCtx); int memUsageBytes = 0; BSONObj obj; Runner::RunnerState state; - while ((state = _runner->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) { + while ((state = _exec->getNext(&obj, NULL)) == Runner::RUNNER_ADVANCED) { if (_dependencies) { _currentBatch.push_back(_dependencies->extractFields(obj)); } @@ -105,15 +106,15 @@ namespace mongo { memUsageBytes += _currentBatch.back().getApproximateSize(); if (memUsageBytes > MaxBytesToReturnToClientAtOnce) { - // End this batch and prepare Runner for yielding. - _runner->saveState(); + // End this batch and prepare PlanExecutor for yielding. + _exec->saveState(); return; } } - // If we got here, there won't be any more documents, so destroy the runner. Can't use + // If we got here, there won't be any more documents, so destroy the executor. Can't use // dispose since we want to keep the _currentBatch. - _runner.reset(); + _exec.reset(); uassert(16028, "collection or index disappeared when cursor yielded", state != Runner::RUNNER_DEAD); @@ -121,7 +122,7 @@ namespace mongo { uassert(17285, "cursor encountered an error: " + WorkingSetCommon::toStatusString(obj), state != Runner::RUNNER_ERROR); - massert(17286, str::stream() << "Unexpected return from Runner::getNext: " << state, + massert(17286, str::stream() << "Unexpected return from PlanExecutor::getNext: " << state, state == Runner::RUNNER_EOF || state == Runner::RUNNER_ADVANCED); } @@ -202,17 +203,17 @@ namespace { Lock::DBRead lk(pExpCtx->opCtx->lockState(), _ns); Client::Context ctx(pExpCtx->opCtx, _ns, /*doVersion=*/ false); - massert(17392, "No _runner. Were we disposed before explained?", - _runner); + massert(17392, "No _exec. Were we disposed before explained?", + _exec); - _runner->restoreState(pExpCtx->opCtx); + _exec->restoreState(pExpCtx->opCtx); TypeExplain* explainRaw; - explainStatus = _runner->getInfo(&explainRaw, NULL); + explainStatus = Explain::legacyExplain(_exec.get(), &explainRaw); if (explainStatus.isOK()) plan.reset(explainRaw); - _runner->saveState(); + _exec->saveState(); } MutableDocument out; @@ -237,19 +238,19 @@ namespace { } DocumentSourceCursor::DocumentSourceCursor(const string& ns, - const boost::shared_ptr<Runner>& runner, + const boost::shared_ptr<PlanExecutor>& exec, const intrusive_ptr<ExpressionContext> &pCtx) : DocumentSource(pCtx) , _docsAddedToBatches(0) , _ns(ns) - , _runner(runner) + , _exec(exec) {} intrusive_ptr<DocumentSourceCursor> DocumentSourceCursor::create( const string& ns, - const boost::shared_ptr<Runner>& runner, + const boost::shared_ptr<PlanExecutor>& exec, const intrusive_ptr<ExpressionContext> &pExpCtx) { - return new DocumentSourceCursor(ns, runner, pExpCtx); + return new DocumentSourceCursor(ns, exec, pExpCtx); } void DocumentSourceCursor::setProjection( diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 9259a2a8072..27dd0bb38c0 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -36,7 +36,7 @@ #include "mongo/db/instance.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" #include "mongo/s/d_logic.h" @@ -74,7 +74,7 @@ namespace { }; } - boost::shared_ptr<Runner> PipelineD::prepareCursorSource( + boost::shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( OperationContext* txn, Collection* collection, const intrusive_ptr<Pipeline>& pPipeline, @@ -104,7 +104,7 @@ namespace { // on secondaries, this is needed. ShardedConnectionInfo::addHook(); } - return boost::shared_ptr<Runner>(); // don't need a cursor + return boost::shared_ptr<PlanExecutor>(); // don't need a cursor } @@ -142,19 +142,19 @@ namespace { } } - // Create the Runner. + // Create the PlanExecutor. // - // If we try to create a Runner that includes both the match and the + // If we try to create a PlanExecutor that includes both the match and the // sort, and the two are incompatible wrt the available indexes, then - // we don't get a Runner back. + // we don't get a PlanExecutor back. // // So we try to use both first. If that fails, try again, without the // sort. // - // If we don't have a sort, jump straight to just creating a Runner + // If we don't have a sort, jump straight to just creating a PlanExecutor. // without the sort. // - // If we are able to incorporate the sort into the Runner, remove it + // If we are able to incorporate the sort into the PlanExecutor, remove it // from the head of the pipeline. // // LATER - we should be able to find this out before we create the @@ -164,7 +164,7 @@ namespace { | QueryPlannerParams::INCLUDE_SHARD_FILTER | QueryPlannerParams::NO_BLOCKING_SORT ; - boost::shared_ptr<Runner> runner; + boost::shared_ptr<PlanExecutor> exec; bool sortInRunner = false; const WhereCallbackReal whereCallback(pExpCtx->opCtx, pExpCtx->ns.db()); @@ -178,10 +178,10 @@ namespace { projectionForQuery, &cq, whereCallback); - Runner* rawRunner; - if (status.isOK() && getRunner(txn, collection, cq, &rawRunner, runnerOptions).isOK()) { - // success: The Runner will handle sorting for us using an index. - runner.reset(rawRunner); + PlanExecutor* rawExec; + if (status.isOK() && getExecutor(txn, collection, cq, &rawExec, runnerOptions).isOK()) { + // success: The PlanExecutor will handle sorting for us using an index. + exec.reset(rawExec); sortInRunner = true; sources.pop_front(); @@ -192,7 +192,7 @@ namespace { } } - if (!runner.get()) { + if (!exec.get()) { const BSONObj noSort; CanonicalQuery* cq; uassertStatusOK( @@ -203,18 +203,18 @@ namespace { &cq, whereCallback)); - Runner* rawRunner; - uassertStatusOK(getRunner(txn, collection, cq, &rawRunner, runnerOptions)); - runner.reset(rawRunner); + PlanExecutor* rawExec; + uassertStatusOK(getExecutor(txn, collection, cq, &rawExec, runnerOptions)); + exec.reset(rawExec); } - // DocumentSourceCursor expects a yielding Runner that has had its state saved. - runner->saveState(); + // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. + exec->saveState(); - // Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline. + // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(fullName, runner, pExpCtx); + DocumentSourceCursor::create(fullName, exec, pExpCtx); // Note the query, sort, and projection for explain. pSource->setQuery(queryObj); @@ -229,7 +229,7 @@ namespace { pPipeline->addInitialSource(pSource); - return runner; + return exec; } } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index b9d85a69dbb..1147755f5b6 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -36,7 +36,7 @@ namespace mongo { struct ExpressionContext; class OperationContext; class Pipeline; - class Runner; + class PlanExecutor; /* PipelineD is an extension of the Pipeline class, but with additional @@ -65,13 +65,13 @@ namespace mongo { * * Must have a ReadContext before entering. * - * If the returned Runner is non-null, you are responsible for ensuring + * If the returned PlanExecutor is non-null, you are responsible for ensuring * it receives appropriate invalidate and kill messages. * * @param pPipeline the logical "this" for this operation * @param pExpCtx the expression context for this pipeline */ - static boost::shared_ptr<Runner> prepareCursorSource( + static boost::shared_ptr<PlanExecutor> prepareCursorSource( OperationContext* txn, Collection* collection, const intrusive_ptr<Pipeline> &pPipeline, diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 510a86fe1c6..f1c8859ec18 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -37,7 +37,6 @@ env.Library( "get_executor.cpp", "get_runner.cpp", "idhack_runner.cpp", - "internal_runner.cpp", "new_find.cpp", "plan_executor.cpp", "plan_ranker.cpp", diff --git a/src/mongo/db/query/eof_runner.cpp b/src/mongo/db/query/eof_runner.cpp index 1438ccd5b6b..e56d380d791 100644 --- a/src/mongo/db/query/eof_runner.cpp +++ b/src/mongo/db/query/eof_runner.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #include "mongo/db/query/eof_runner.h" #include "mongo/db/diskloc.h" diff --git a/src/mongo/db/query/eof_runner.h b/src/mongo/db/query/eof_runner.h index 983c4b206a7..a24d83fbbda 100644 --- a/src/mongo/db/query/eof_runner.h +++ b/src/mongo/db/query/eof_runner.h @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #pragma once #include <boost/scoped_ptr.hpp> diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index 98c2d40fe69..b2021abde91 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -31,6 +31,7 @@ #include "mongo/db/query/explain.h" #include "mongo/db/exec/multi_plan.h" +#include "mongo/db/query/explain_plan.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_planner.h" @@ -114,6 +115,10 @@ namespace { const CountStats* spec = static_cast<const CountStats*>(specific); return spec->keysExamined; } + else if (STAGE_DISTINCT == type) { + const DistinctScanStats* spec = static_cast<const DistinctScanStats*>(specific); + return spec->keysExamined; + } return 0; } @@ -368,7 +373,7 @@ namespace mongo { size_t totalKeysExamined = 0; size_t totalDocsExamined = 0; for (size_t i = 0; i < statsNodes.size(); ++i) { - + totalKeysExamined += getKeysExamined(statsNodes[i]->stageType, statsNodes[i]->specific.get()); totalDocsExamined += getDocsExamined(statsNodes[i]->stageType, @@ -546,4 +551,89 @@ namespace mongo { statsOut->summaryStr = ss; } + // TODO: This is temporary and should get deleted. There are a few small ways in which + // this differs from 2.6 explain, but I'm not too worried because this entire format is + // going away soon: + // 1) 'indexBounds' field excluded from idhack explain. + // 2) 'filterSet' field (for index filters) excluded. + Status Explain::legacyExplain(PlanExecutor* exec, TypeExplain** explain) { + invariant(exec); + invariant(explain); + + scoped_ptr<PlanStageStats> stats(exec->getStats()); + if (NULL == stats.get()) { + return Status(ErrorCodes::InternalError, "no stats available to explain plan"); + } + + // Special explain format for EOF. + if (STAGE_EOF == stats->stageType) { + *explain = new TypeExplain(); + + // Fill in mandatory fields. + (*explain)->setN(0); + (*explain)->setNScannedObjects(0); + (*explain)->setNScanned(0); + + // Fill in all the main fields that don't have a default in the explain data structure. + (*explain)->setCursor("BasicCursor"); + (*explain)->setScanAndOrder(false); + (*explain)->setIsMultiKey(false); + (*explain)->setIndexOnly(false); + (*explain)->setNYields(0); + (*explain)->setNChunkSkips(0); + + TypeExplain* allPlans = new TypeExplain; + allPlans->setCursor("BasicCursor"); + (*explain)->addToAllPlans(allPlans); // ownership xfer + + (*explain)->setNScannedObjectsAllPlans(0); + (*explain)->setNScannedAllPlans(0); + + return Status::OK(); + } + + // Special explain format for idhack. + vector<PlanStageStats*> statNodes; + flattenStatsTree(stats.get(), &statNodes); + PlanStageStats* idhack = NULL; + for (size_t i = 0; i < statNodes.size(); i++) { + if (STAGE_IDHACK == statNodes[i]->stageType) { + idhack = statNodes[i]; + break; + } + } + + if (NULL != idhack) { + // Explain format does not match 2.4 and is intended + // to indicate clearly that the ID hack has been applied. + *explain = new TypeExplain(); + + IDHackStats* idhackStats = static_cast<IDHackStats*>(idhack->specific.get()); + + (*explain)->setCursor("IDCursor"); + (*explain)->setIDHack(true); + (*explain)->setN(stats->common.advanced); + (*explain)->setNScanned(idhackStats->keysExamined); + (*explain)->setNScannedObjects(idhackStats->docsExamined); + + return Status::OK(); + } + + Status status = explainPlan(*stats, explain, true /* full details */); + if (!status.isOK()) { + return status; + } + + // Fill in explain fields that are accounted by on the runner level. + TypeExplain* chosenPlan = NULL; + explainPlan(*stats, &chosenPlan, false /* no full details */); + if (chosenPlan) { + (*explain)->addToAllPlans(chosenPlan); + } + (*explain)->setNScannedObjectsAllPlans((*explain)->getNScannedObjects()); + (*explain)->setNScannedAllPlans((*explain)->getNScanned()); + + return Status::OK(); + } + } // namespace mongo diff --git a/src/mongo/db/query/explain.h b/src/mongo/db/query/explain.h index ea57c48eef3..da9d3daff21 100644 --- a/src/mongo/db/query/explain.h +++ b/src/mongo/db/query/explain.h @@ -34,6 +34,7 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_planner_params.h" #include "mongo/db/query/query_solution.h" +#include "mongo/db/query/type_explain.h" namespace mongo { @@ -146,6 +147,16 @@ namespace mongo { */ static void explainCountEmptyQuery(BSONObjBuilder* out); + /** + * Generate the legacy explain format from a PlanExecutor. + * + * On success, the caller owns 'explain'. + * + * TODO: THIS IS TEMPORARY. Once the legacy explain code is deleted, we won't + * need this anymore. + */ + static Status legacyExplain(PlanExecutor* exec, TypeExplain** explain); + private: /** * Converts the stats tree 'stats' into a corresponding BSON object containing diff --git a/src/mongo/db/query/explain_plan.cpp b/src/mongo/db/query/explain_plan.cpp index ccc662e0c7e..db9f7b82f40 100644 --- a/src/mongo/db/query/explain_plan.cpp +++ b/src/mongo/db/query/explain_plan.cpp @@ -436,7 +436,7 @@ namespace mongo { } // Common details. - bob->append("type", stageTypeString(stats.stageType)); + bob->append("type", stats.common.stageTypeStr); bob->appendNumber("works", stats.common.works); bob->appendNumber("yields", stats.common.yields); bob->appendNumber("unyields", stats.common.unyields); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 4b56863eeaa..6a3a5bfffde 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -93,7 +93,7 @@ namespace mongo { } namespace { - // The body is below in the "count hack" section but getRunner calls it. + // The body is below in the "count hack" section but getExecutor calls it. bool turnIxscanIntoCount(QuerySolution* soln); } // namespace @@ -174,7 +174,7 @@ namespace mongo { << " Using EOF stage: " << unparsedQuery.toString(); EOFStage* eofStage = new EOFStage(); WorkingSet* ws = new WorkingSet(); - *out = new PlanExecutor(ws, eofStage, collection); + *out = new PlanExecutor(ws, eofStage, ns); return Status::OK(); } diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index a1abfaaf4e0..f5ae4a4ef97 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -39,7 +39,7 @@ namespace mongo { /** * Filter indexes retrieved from index catalog by * allowed indices in query settings. - * Used by getRunner(). + * Used by getExecutor(). * This function is public to facilitate testing. */ void filterAllowedIndexEntries(const AllowedIndices& allowedIndices, diff --git a/src/mongo/db/query/get_runner.cpp b/src/mongo/db/query/get_runner.cpp index e93d670a898..dcaca93dc7d 100644 --- a/src/mongo/db/query/get_runner.cpp +++ b/src/mongo/db/query/get_runner.cpp @@ -685,16 +685,4 @@ namespace mongo { return getRunner(txn, collection, cq, out); } - ScopedRunnerRegistration::ScopedRunnerRegistration(Runner* runner) - : _runner(runner) { - // Collection can be null for EOFRunner, or other places where registration is not needed - if ( _runner->collection() ) - _runner->collection()->cursorCache()->registerRunner( runner ); - } - - ScopedRunnerRegistration::~ScopedRunnerRegistration() { - if ( _runner->collection() ) - _runner->collection()->cursorCache()->deregisterRunner( _runner ); - } - } // namespace mongo diff --git a/src/mongo/db/query/get_runner.h b/src/mongo/db/query/get_runner.h index 2d9dd75c88e..4740e2d65ef 100644 --- a/src/mongo/db/query/get_runner.h +++ b/src/mongo/db/query/get_runner.h @@ -107,22 +107,4 @@ namespace mongo { const QueryPlannerParams& plannerParams, Runner** out); - /** - * RAII approach to ensuring that runners are deregistered in newRunQuery. - * - * While retrieving the first batch of results, newRunQuery manually registers the runner with - * ClientCursor. Certain query execution paths, namely $where, can throw an exception. If we - * fail to deregister the runner, we will call invalidate/kill on the - * still-registered-yet-deleted runner. - * - * For any subsequent calls to getMore, the runner is already registered with ClientCursor - * by virtue of being cached, so this exception-proofing is not required. - */ - struct ScopedRunnerRegistration { - ScopedRunnerRegistration(Runner* runner); - ~ScopedRunnerRegistration(); - - Runner* const _runner; - }; - } // namespace mongo diff --git a/src/mongo/db/query/idhack_runner.cpp b/src/mongo/db/query/idhack_runner.cpp index cf8f8dc78f5..814cc1bf9c8 100644 --- a/src/mongo/db/query/idhack_runner.cpp +++ b/src/mongo/db/query/idhack_runner.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #include "mongo/db/query/idhack_runner.h" #include "mongo/client/dbclientinterface.h" diff --git a/src/mongo/db/query/idhack_runner.h b/src/mongo/db/query/idhack_runner.h index a6747a6d83b..3024bc6b2b7 100644 --- a/src/mongo/db/query/idhack_runner.h +++ b/src/mongo/db/query/idhack_runner.h @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #pragma once #include <boost/scoped_ptr.hpp> diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index dae7af1c521..9cfff406b8f 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -31,10 +31,10 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/eof.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" -#include "mongo/db/query/eof_runner.h" -#include "mongo/db/query/internal_runner.h" +#include "mongo/db/query/plan_executor.h" namespace mongo { @@ -64,13 +64,16 @@ namespace mongo { /** * Return a collection scan. Caller owns pointer. */ - static Runner* collectionScan(OperationContext* txn, - const StringData& ns, - Collection* collection, - const Direction direction = FORWARD, - const DiskLoc startLoc = DiskLoc()) { + static PlanExecutor* collectionScan(OperationContext* txn, + const StringData& ns, + Collection* collection, + const Direction direction = FORWARD, + const DiskLoc startLoc = DiskLoc()) { + WorkingSet* ws = new WorkingSet(); + if (NULL == collection) { - return new EOFRunner(NULL, ns.toString()); + EOFStage* eof = new EOFStage(); + return new PlanExecutor(ws, eof, ns.toString()); } dassert( ns == collection->ns().ns() ); @@ -86,20 +89,22 @@ namespace mongo { params.direction = CollectionScanParams::BACKWARD; } - WorkingSet* ws = new WorkingSet(); CollectionScan* cs = new CollectionScan(txn, params, ws, NULL); - return new InternalRunner(collection, cs, ws); + PlanExecutor* exec = new PlanExecutor(ws, cs, collection); + // 'exec' will be registered until it is destroyed. + exec->registerExecInternalPlan(); + return exec; } /** * Return an index scan. Caller owns returned pointer. */ - static Runner* indexScan(OperationContext* txn, - const Collection* collection, - const IndexDescriptor* descriptor, - const BSONObj& startKey, const BSONObj& endKey, - bool endKeyInclusive, Direction direction = FORWARD, - int options = 0) { + static PlanExecutor* indexScan(OperationContext* txn, + const Collection* collection, + const IndexDescriptor* descriptor, + const BSONObj& startKey, const BSONObj& endKey, + bool endKeyInclusive, Direction direction = FORWARD, + int options = 0) { invariant(collection); invariant(descriptor); @@ -114,13 +119,16 @@ namespace mongo { WorkingSet* ws = new WorkingSet(); IndexScan* ix = new IndexScan(txn, params, ws, NULL); + PlanStage* root = ix; + if (IXSCAN_FETCH & options) { - return new InternalRunner( - collection, new FetchStage(ws, ix, NULL, collection), ws); - } - else { - return new InternalRunner(collection, ix, ws); + root = new FetchStage(ws, root, NULL, collection); } + + PlanExecutor* exec = new PlanExecutor(ws, root, collection); + // 'exec' will be registered until it is destroyed. + exec->registerExecInternalPlan(); + return exec; } }; diff --git a/src/mongo/db/query/internal_runner.cpp b/src/mongo/db/query/internal_runner.cpp deleted file mode 100644 index f68b82c5080..00000000000 --- a/src/mongo/db/query/internal_runner.cpp +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/query/internal_runner.h" - -#include "mongo/db/catalog/collection.h" -#include "mongo/db/diskloc.h" -#include "mongo/db/exec/plan_stage.h" -#include "mongo/db/exec/working_set.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/query/canonical_query.h" -#include "mongo/db/query/explain_plan.h" -#include "mongo/db/query/plan_executor.h" -#include "mongo/db/query/type_explain.h" - -namespace mongo { - - InternalRunner::InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws) - : _collection(collection), - _exec(new PlanExecutor(ws, root, collection)) { - - _collection->cursorCache()->registerRunner(this); - invariant( collection ); - } - - InternalRunner::~InternalRunner() { - if (_collection) { - _collection->cursorCache()->deregisterRunner(this); - } - } - - Runner::RunnerState InternalRunner::getNext(BSONObj* objOut, DiskLoc* dlOut) { - return _exec->getNext(objOut, dlOut); - } - - bool InternalRunner::isEOF() { - return _exec->isEOF(); - } - - void InternalRunner::saveState() { - _exec->saveState(); - } - - bool InternalRunner::restoreState(OperationContext* opCtx) { - return _exec->restoreState(); - } - - const std::string& InternalRunner::ns() { - return _collection->ns().ns(); - } - - void InternalRunner::invalidate(const DiskLoc& dl, InvalidationType type) { - _exec->invalidate(dl, type); - } - - void InternalRunner::kill() { - _exec->kill(); - _collection = NULL; - } - - Status InternalRunner::getInfo(TypeExplain** explain, - PlanInfo** planInfo) const { - if (NULL != explain) { - verify(_exec.get()); - - scoped_ptr<PlanStageStats> stats(_exec->getStats()); - if (NULL == stats.get()) { - return Status(ErrorCodes::InternalError, "no stats available to explain plan"); - } - - Status status = explainPlan(*stats, explain, true /* full details */); - if (!status.isOK()) { - return status; - } - - // Fill in explain fields that are accounted by on the runner level. - TypeExplain* chosenPlan = NULL; - explainPlan(*stats, &chosenPlan, false /* no full details */); - if (chosenPlan) { - (*explain)->addToAllPlans(chosenPlan); - } - (*explain)->setNScannedObjectsAllPlans((*explain)->getNScannedObjects()); - (*explain)->setNScannedAllPlans((*explain)->getNScanned()); - } - else if (NULL != planInfo) { - *planInfo = new PlanInfo(); - (*planInfo)->planSummary = "INTERNAL"; - } - - return Status::OK(); - } - -} // namespace mongo diff --git a/src/mongo/db/query/internal_runner.h b/src/mongo/db/query/internal_runner.h deleted file mode 100644 index d01a39606f6..00000000000 --- a/src/mongo/db/query/internal_runner.h +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright (C) 2013-2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/scoped_ptr.hpp> -#include <string> - -#include "mongo/base/status.h" -#include "mongo/db/query/runner.h" - -namespace mongo { - - class BSONObj; - class CanonicalQuery; - class DiskLoc; - class OperationContext; - class PlanExecutor; - struct PlanInfo; - class PlanStage; - struct QuerySolution; - class TypeExplain; - class WorkingSet; - - /** - * This is a runner that was requested by an internal client of the query system, as opposed to - * runners that are built in response to a query entering the system. It is only used by - * internal clients of the query systems (e.g., chunk migration, index building, commands that - * traverse data such as md5, ... ) - * - * The salient feature of this Runner is that it does not interact with the cache at all. - */ - class InternalRunner : public Runner { - public: - - /** Takes ownership of root and ws. */ - InternalRunner(const Collection* collection, PlanStage* root, WorkingSet* ws); - - virtual ~InternalRunner(); - - Runner::RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut); - - virtual bool isEOF(); - - virtual void saveState(); - - virtual bool restoreState(OperationContext* opCtx); - - virtual const std::string& ns(); - - virtual void invalidate(const DiskLoc& dl, InvalidationType type); - - virtual void kill(); - - virtual const Collection* collection() { return _collection; } - - /** - * Returns OK, allocating and filling in '*explain' with details of the plan used by - * this runner. Caller takes ownership of '*explain'. Similarly fills in '*planInfo', - * which the caller takes ownership of. Otherwise, return a status describing the - * error. - * - * Strictly speaking, an InternalRunner's explain is never exposed, simply because an - * InternalRunner itself is not exposed. But we implement the explain here anyway so - * to help in debugging situations. - */ - virtual Status getInfo(TypeExplain** explain, - PlanInfo** planInfo) const; - - private: - const Collection* _collection; - - boost::scoped_ptr<PlanExecutor> _exec; - }; - -} // namespace mongo diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp index 3cc4a035a6e..4e627517910 100644 --- a/src/mongo/db/query/new_find.cpp +++ b/src/mongo/db/query/new_find.cpp @@ -38,14 +38,12 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/keypattern.h" #include "mongo/db/query/explain.h" +#include "mongo/db/query/explain_plan.h" #include "mongo/db/query/find_constants.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/query/get_runner.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/qlog.h" #include "mongo/db/query/query_planner_params.h" -#include "mongo/db/query/single_solution_runner.h" -#include "mongo/db/query/type_explain.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -237,15 +235,15 @@ namespace mongo { startingResult = cc->pos(); // What gives us results. - Runner* runner = cc->getRunner(); + PlanExecutor* exec = cc->getExecutor(); const int queryOptions = cc->queryOptions(); - // Get results out of the runner. - runner->restoreState(txn); + // Get results out of the executor. + exec->restoreState(txn); BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { // Add result to output buffer. bb.appendBuf((void*)obj.objdata(), obj.objsize()); @@ -275,7 +273,7 @@ namespace mongo { } // We save the client cursor when there might be more results, and hence we may receive - // another getmore. If we receive a EOF or an error, or the runner is dead, then we know + // another getmore. If we receive a EOF or an error, or 'exec' is dead, then we know // that we will not be producing more results. We indicate that the cursor is closed by // sending a cursorId of 0 back to the client. // @@ -288,16 +286,10 @@ namespace mongo { if (Runner::RUNNER_DEAD == state || Runner::RUNNER_ERROR == state) { // Propagate this error to caller. if (Runner::RUNNER_ERROR == state) { - // Stats are helpful when errors occur. - TypeExplain* bareExplain; - Status res = runner->getInfo(&bareExplain, NULL); - if (res.isOK()) { - boost::scoped_ptr<TypeExplain> errorExplain(bareExplain); - error() << "Runner error, stats:\n" - << errorExplain->stats.jsonString(Strict, true); - } - - uasserted(17406, "getMore runner error: " + + scoped_ptr<PlanStageStats> stats(exec->getStats()); + error() << "Runner error, stats: " + << statsToBSON(*stats); + uasserted(17406, "getMore executor error: " + WorkingSetCommon::toStatusString(obj)); } @@ -307,7 +299,7 @@ namespace mongo { // In the old system tailable capped cursors would be killed off at the // cursorid level. If a tailable capped cursor is nuked the cursorid // would vanish. - // + // // In the new system they die and are cleaned up later (or time out). // So this is where we get to remove the cursorid. if (0 == numResults) { @@ -325,7 +317,7 @@ namespace mongo { if (!saveClientCursor) { ccPin.deleteUnderlying(); - // cc is now invalid, as is the runner + // cc is now invalid, as is the executor cursorid = 0; cc = NULL; QLOG() << "getMore NOT saving client cursor, ended with state " @@ -335,7 +327,7 @@ namespace mongo { else { // Continue caching the ClientCursor. cc->incPos(numResults); - runner->saveState(); + exec->saveState(); QLOG() << "getMore saving client cursor ended with state " << Runner::statestr(state) << endl; @@ -368,7 +360,10 @@ namespace mongo { Status getOplogStartHack(OperationContext* txn, Collection* collection, CanonicalQuery* cq, - Runner** runnerOut) { + PlanExecutor** execOut) { + invariant(cq); + auto_ptr<CanonicalQuery> autoCq(cq); + if ( collection == NULL ) return Status(ErrorCodes::InternalError, "getOplogStartHack called with a NULL collection" ); @@ -404,14 +399,17 @@ namespace mongo { OplogStart* stage = new OplogStart(txn, collection, tsExpr, oplogws); // Takes ownership of ws and stage. - auto_ptr<InternalRunner> runner(new InternalRunner(collection, stage, oplogws)); + auto_ptr<PlanExecutor> exec(new PlanExecutor(oplogws, stage, collection)); + exec->registerExecInternalPlan(); // The stage returns a DiskLoc of where to start. DiskLoc startLoc; - Runner::RunnerState state = runner->getNext(NULL, &startLoc); + Runner::RunnerState state = exec->getNext(NULL, &startLoc); // This is normal. The start of the oplog is the beginning of the collection. - if (Runner::RUNNER_EOF == state) { return getRunner(txn, collection, cq, runnerOut); } + if (Runner::RUNNER_EOF == state) { + return getExecutor(txn, collection, autoCq.release(), execOut); + } // This is not normal. An error was encountered. if (Runner::RUNNER_ADVANCED != state) { @@ -430,8 +428,8 @@ namespace mongo { WorkingSet* ws = new WorkingSet(); CollectionScan* cs = new CollectionScan(txn, params, ws, cq->root()); - // Takes ownership of cq, cs, ws. - *runnerOut = new SingleSolutionRunner(collection, cq, NULL, cs, ws); + // Takes ownership of 'ws', 'cs', and 'cq'. + *execOut = new PlanExecutor(ws, cs, autoCq.release(), collection); return Status::OK(); } @@ -505,8 +503,8 @@ namespace mongo { QLOG() << "Running query:\n" << cq->toString(); LOG(2) << "Running query: " << cq->toStringShort(); - // Parse, canonicalize, plan, transcribe, and get a runner. - Runner* rawRunner = NULL; + // Parse, canonicalize, plan, transcribe, and get a plan executor. + PlanExecutor* rawExec = NULL; // We use this a lot below. const LiteParsedQuery& pq = cq->getParsed(); @@ -565,42 +563,46 @@ namespace mongo { return ""; } - // We'll now try to get the query runner that will execute this query for us. There - // are a few cases in which we know upfront which runner we should get and, therefore, + // We'll now try to get the query executor that will execute this query for us. There + // are a few cases in which we know upfront which executor we should get and, therefore, // we shortcut the selection process here. // - // (a) If the query is over a collection that doesn't exist, we get a special runner - // that's is so (a runner) which doesn't return results, the EOFRunner. + // (a) If the query is over a collection that doesn't exist, we use an EOFStage. // - // (b) if the query is a replication's initial sync one, we get a SingleSolutinRunner - // that uses a specifically designed stage that skips extents faster (see details in - // exec/oplogstart.h) + // (b) if the query is a replication's initial sync one, we use a specifically designed + // stage that skips extents faster (see details in exec/oplogstart.h). // - // Otherwise we go through the selection of which runner is most suited to the + // Otherwise we go through the selection of which executor is most suited to the // query + run-time context at hand. Status status = Status::OK(); if (collection == NULL) { - rawRunner = new EOFRunner(cq, cq->ns()); + LOG(2) << "Collection " << ns << " does not exist." + << " Using EOF stage: " << cq->toStringShort(); + EOFStage* eofStage = new EOFStage(); + WorkingSet* ws = new WorkingSet(); + // Takes ownership of 'cq'. + rawExec = new PlanExecutor(ws, eofStage, cq, NULL); } else if (pq.hasOption(QueryOption_OplogReplay)) { - status = getOplogStartHack(txn, collection, cq, &rawRunner); + // Takes ownership of 'cq'. + status = getOplogStartHack(txn, collection, cq, &rawExec); } else { - // Takes ownership of cq. size_t options = QueryPlannerParams::DEFAULT; if (shardingState.needCollectionMetadata(pq.ns())) { options |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } - status = getRunner(txn, collection, cq, &rawRunner, options); + // Takes ownership of 'cq'. + status = getExecutor(txn, collection, cq, &rawExec, options); } if (!status.isOK()) { - // NOTE: Do not access cq as getRunner has deleted it. + // NOTE: Do not access cq as getExecutor has deleted it. uasserted(17007, "Unable to execute query: " + status.reason()); } - verify(NULL != rawRunner); - auto_ptr<Runner> runner(rawRunner); + verify(NULL != rawExec); + auto_ptr<PlanExecutor> exec(rawExec); // We freak out later if this changes before we're done with the query. const ChunkVersion shardingVersionAtStart = shardingState.getVersion(cq->ns()); @@ -635,30 +637,28 @@ namespace mongo { BufBuilder bb(32768); bb.skip(sizeof(QueryResult)); - // How many results have we obtained from the runner? + // How many results have we obtained from the executor? int numResults = 0; // If we're replaying the oplog, we save the last time that we read. OpTime slaveReadTill; - // Do we save the Runner in a ClientCursor for getMore calls later? + // Do we save the PlanExecutor in a ClientCursor for getMore calls later? bool saveClientCursor = false; - // We turn on auto-yielding for the runner here. The runner registers itself with the - // active runners list in ClientCursor. - auto_ptr<ScopedRunnerRegistration> safety(new ScopedRunnerRegistration(runner.get())); + // The executor registers itself with the active executors list in ClientCursor. + auto_ptr<ScopedExecutorRegistration> safety(new ScopedExecutorRegistration(exec.get())); BSONObj obj; Runner::RunnerState state; // uint64_t numMisplacedDocs = 0; - // Have we retrieved info about which plan the runner will - // use to execute the query yet? - bool gotPlanInfo = false; - PlanInfo* rawInfo; - boost::scoped_ptr<PlanInfo> planInfo; + // Get summary info about which plan the executor is using. + PlanSummaryStats stats; + Explain::getSummaryStats(exec.get(), &stats); + curop.debug().planSummary = stats.summaryStr.c_str(); - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { // Add result to output buffer. This is unnecessary if explain info is requested if (!isExplain) { bb.appendBuf((void*)obj.objdata(), obj.objsize()); @@ -667,24 +667,6 @@ namespace mongo { // Count the result. ++numResults; - // In the case of the multi plan runner, we may not be able to - // successfully retrieve plan info until after the query starts - // to run. This is because the multi plan runner doesn't know what - // plan it will end up using until it runs candidates and selects - // the best. - // - // TODO: Do we ever want to output what the MPR is comparing? - if (!gotPlanInfo) { - Status infoStatus = runner->getInfo(NULL, &rawInfo); - if (infoStatus.isOK()) { - gotPlanInfo = true; - planInfo.reset(rawInfo); - // planSummary is really a ThreadSafeString which copies the data from - // the provided pointer. - curop.debug().planSummary = planInfo->planSummary.c_str(); - } - } - // Possibly note slave's position in the oplog. if (pq.hasOption(QueryOption_OplogReplay)) { BSONElement e = obj["ts"]; @@ -712,47 +694,30 @@ namespace mongo { << endl; // If only one result requested assume it's a findOne() and don't save the cursor. if (pq.wantMore() && 1 != pq.getNumToReturn()) { - QLOG() << " runner EOF=" << runner->isEOF() << endl; - saveClientCursor = !runner->isEOF(); + QLOG() << " executor EOF=" << exec->isEOF() << endl; + saveClientCursor = !exec->isEOF(); } break; } } - // Try to get information about the plan which the runner - // will use to execute the query, it we don't have it already. - if (!gotPlanInfo) { - Status infoStatus = runner->getInfo(NULL, &rawInfo); - if (infoStatus.isOK()) { - gotPlanInfo = true; - planInfo.reset(rawInfo); - // planSummary is really a ThreadSafeString which copies the data from - // the provided pointer. - curop.debug().planSummary = planInfo->planSummary.c_str(); - } - } - - // If we cache the runner later, we want to deregister it as it receives notifications + // If we cache the executor later, we want to deregister it as it receives notifications // anyway by virtue of being cached. // - // If we don't cache the runner later, we are deleting it, so it must be deregistered. + // If we don't cache the executor later, we are deleting it, so it must be deregistered. // - // So, no matter what, deregister the runner. + // So, no matter what, deregister the executor. safety.reset(); // Caller expects exceptions thrown in certain cases. if (Runner::RUNNER_ERROR == state) { - TypeExplain* bareExplain; - Status res = runner->getInfo(&bareExplain, NULL); - if (res.isOK()) { - boost::scoped_ptr<TypeExplain> errorExplain(bareExplain); - error() << "Runner error, stats:\n" - << errorExplain->stats.jsonString(Strict, true); - } - uasserted(17144, "Runner error: " + WorkingSetCommon::toStatusString(obj)); + scoped_ptr<PlanStageStats> stats(exec->getStats()); + error() << "Runner error, stats: " + << statsToBSON(*stats); + uasserted(17144, "Executor error: " + WorkingSetCommon::toStatusString(obj)); } - // Why save a dead runner? + // Why save a dead executor? if (Runner::RUNNER_DEAD == state) { saveClientCursor = false; } @@ -788,9 +753,9 @@ namespace mongo { if (isExplain || ctx.ctx().db()->getProfilingLevel() > 0 || elapsedMillis > serverGlobalParams.slowMS) { - // Ask the runner to produce explain information. + // Ask the executor to produce explain information. TypeExplain* bareExplain; - Status res = runner->getInfo(&bareExplain, NULL); + Status res = Explain::legacyExplain(exec.get(), &bareExplain); if (res.isOK()) { explain.reset(bareExplain); } @@ -830,21 +795,21 @@ namespace mongo { long long ccId = 0; if (saveClientCursor) { - // We won't use the runner until it's getMore'd. - runner->saveState(); + // We won't use the executor until it's getMore'd. + exec->saveState(); // Allocate a new ClientCursor. We don't have to worry about leaking it as it's // inserted into a global map by its ctor. - ClientCursor* cc = new ClientCursor(collection, runner.get(), + ClientCursor* cc = new ClientCursor(collection, exec.get(), cq->getParsed().getOptions(), cq->getParsed().getFilter()); ccId = cc->cursorid(); - QLOG() << "caching runner with cursorid " << ccId + QLOG() << "caching executor with cursorid " << ccId << " after returning " << numResults << " results" << endl; - // ClientCursor takes ownership of runner. Release to make sure it's not deleted. - runner.release(); + // ClientCursor takes ownership of executor. Release to make sure it's not deleted. + exec.release(); // TODO document if (pq.hasOption(QueryOption_OplogReplay) && !slaveReadTill.isNull()) { @@ -865,7 +830,7 @@ namespace mongo { cc->setLeftoverMaxTimeMicros(curop.getRemainingMaxTimeMicros()); } else { - QLOG() << "Not caching runner but returning " << numResults << " results.\n"; + QLOG() << "Not caching executor but returning " << numResults << " results.\n"; } // Add the results from the query into the output buffer. diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index df809af0e68..f1f7c676b74 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -28,6 +28,7 @@ #include "mongo/db/query/plan_executor.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/working_set.h" @@ -41,6 +42,17 @@ namespace mongo { _workingSet(ws), _qs(NULL), _root(rt), + _killed(false) { + initNs(); + } + + PlanExecutor::PlanExecutor(WorkingSet* ws, PlanStage* rt, std::string ns) + : _collection(NULL), + _cq(NULL), + _workingSet(ws), + _qs(NULL), + _root(rt), + _ns(ns), _killed(false) { } PlanExecutor::PlanExecutor(WorkingSet* ws, PlanStage* rt, CanonicalQuery* cq, @@ -50,7 +62,9 @@ namespace mongo { _workingSet(ws), _qs(NULL), _root(rt), - _killed(false) { } + _killed(false) { + initNs(); + } PlanExecutor::PlanExecutor(WorkingSet* ws, PlanStage* rt, QuerySolution* qs, CanonicalQuery* cq, const Collection* collection) @@ -59,7 +73,19 @@ namespace mongo { _workingSet(ws), _qs(qs), _root(rt), - _killed(false) { } + _killed(false) { + initNs(); + } + + void PlanExecutor::initNs() { + if (NULL != _collection) { + _ns = _collection->ns().ns(); + } + else { + invariant(NULL != _cq.get()); + _ns = _cq->getParsed().ns(); + } + } PlanExecutor::~PlanExecutor() { } @@ -79,13 +105,17 @@ namespace mongo { return _root->getStats(); } + const Collection* PlanExecutor::collection() const { + return _collection; + } + void PlanExecutor::saveState() { if (!_killed) { _root->prepareToYield(); } } - bool PlanExecutor::restoreState() { + bool PlanExecutor::restoreState(OperationContext* opCtx) { if (!_killed) { - _root->recoverFromYield(); + _root->recoverFromYield(opCtx); } return !_killed; } @@ -170,8 +200,13 @@ namespace mongo { return _killed || _root->isEOF(); } + void PlanExecutor::registerExecInternalPlan() { + _safety.reset(new ScopedExecutorRegistration(this)); + } + void PlanExecutor::kill() { _killed = true; + _collection = NULL; } Status PlanExecutor::executePlan() { @@ -191,4 +226,24 @@ namespace mongo { return Status::OK(); } + const string& PlanExecutor::ns() { + return _ns; + } + + // + // ScopedExecutorRegistration + // + + ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec) + : _exec(exec) { + // Collection can be null for EOFRunner, or other places where registration is not needed + if ( _exec->collection() ) + _exec->collection()->cursorCache()->registerExecutor( exec ); + } + + ScopedExecutorRegistration::~ScopedExecutorRegistration() { + if ( _exec->collection() ) + _exec->collection()->cursorCache()->deregisterExecutor( _exec ); + } + } // namespace mongo diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 99043d9c4f4..87c8aaa285f 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -41,6 +41,25 @@ namespace mongo { class PlanStage; struct PlanStageStats; class WorkingSet; + class PlanExecutor; + + /** + * RAII approach to ensuring that plan executors are deregistered. + * + * While retrieving the first batch of results, newRunQuery manually registers the executor with + * ClientCursor. Certain query execution paths, namely $where, can throw an exception. If we + * fail to deregister the executor, we will call invalidate/kill on the + * still-registered-yet-deleted executor. + * + * For any subsequent calls to getMore, the executor is already registered with ClientCursor + * by virtue of being cached, so this exception-proofing is not required. + */ + struct ScopedExecutorRegistration { + ScopedExecutorRegistration(PlanExecutor* exec); + ~ScopedExecutorRegistration(); + + PlanExecutor* const _exec; + }; /** * A PlanExecutor is the abstraction that knows how to crank a tree of stages into execution. @@ -61,6 +80,12 @@ namespace mongo { PlanExecutor(WorkingSet* ws, PlanStage* rt, const Collection* collection); /** + * Used when we have a NULL collection and no canonical query. In this case, + * we need to explicitly pass a namespace to the plan executor. + */ + PlanExecutor(WorkingSet* ws, PlanStage* rt, std::string ns); + + /** * Used when there is a canonical query but no query solution (e.g. idhack * queries, queries against a NULL collection, queries using the subplan stage). */ @@ -96,6 +121,11 @@ namespace mongo { CanonicalQuery* getCanonicalQuery() const; /** + * The collection in which this executor is working. + */ + const Collection* collection() const; + + /** * Generates a tree of stats objects with a separate lifetime from the execution * stage tree wrapped by this PlanExecutor. The caller owns the returned pointer. * @@ -111,7 +141,7 @@ namespace mongo { void saveState(); /** TODO document me */ - bool restoreState(); + bool restoreState(OperationContext* opCtx); /** TODO document me */ void invalidate(const DiskLoc& dl, InvalidationType type); @@ -127,6 +157,22 @@ namespace mongo { bool isEOF(); /** + * Register this plan executor with the collection cursor cache so that it + * receives event notifications. + * + * Deregistration happens automatically when this plan executor is destroyed. + * + * Used just for internal plans: + * -- InternalPlanner::collectionScan(...) (see internal_plans.h) + * -- InternalPlanner::indexScan(...) (see internal_plans.h) + * -- getOplogStartHack(...) (see new_find.cpp) + * -- storeCurrentLocs(...) (see d_migrate.cpp) + * + * TODO: we probably don't need this for 2.8. + */ + void registerExecInternalPlan(); + + /** * During the yield, the database we're operating over or any collection we're relying on * may be dropped. When this happens all cursors and runners on that database and * collection are killed or deleted in some fashion. (This is how the _killed gets set.) @@ -140,7 +186,17 @@ namespace mongo { */ Status executePlan(); + /** + * Return the NS that the query is running over. + */ + const std::string& ns(); + private: + /** + * Initialize the namespace using either the canonical query or the collection. + */ + void initNs(); + // Collection over which this plan executor runs. Used to resolve record ids retrieved by // the plan stages. The collection must not be destroyed while there are active plans. const Collection* _collection; @@ -150,6 +206,11 @@ namespace mongo { boost::scoped_ptr<QuerySolution> _qs; std::auto_ptr<PlanStage> _root; + // Deregisters this executor when it is destroyed. + boost::scoped_ptr<ScopedExecutorRegistration> _safety; + + std::string _ns; + // Did somebody drop an index we care about or the namespace we're looking at? If so, // we'll be killed. bool _killed; diff --git a/src/mongo/db/query/runner.h b/src/mongo/db/query/runner.h index 9d915f4f8ae..db7a642552f 100644 --- a/src/mongo/db/query/runner.h +++ b/src/mongo/db/query/runner.h @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #pragma once #include "mongo/base/status.h" diff --git a/src/mongo/db/query/single_solution_runner.cpp b/src/mongo/db/query/single_solution_runner.cpp index a2f4327ccc1..e7d3ce652a7 100644 --- a/src/mongo/db/query/single_solution_runner.cpp +++ b/src/mongo/db/query/single_solution_runner.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #include "mongo/db/query/single_solution_runner.h" #include "mongo/db/diskloc.h" @@ -69,7 +71,7 @@ namespace mongo { } bool SingleSolutionRunner::restoreState(OperationContext* opCtx) { - return _exec->restoreState(); + return _exec->restoreState(opCtx); } void SingleSolutionRunner::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/query/single_solution_runner.h b/src/mongo/db/query/single_solution_runner.h index 116faa51000..c48e16086cd 100644 --- a/src/mongo/db/query/single_solution_runner.h +++ b/src/mongo/db/query/single_solution_runner.h @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #pragma once #include <boost/scoped_ptr.hpp> diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index d258bd08f2d..6b841645ffb 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -48,6 +48,9 @@ namespace mongo { // stage is an ixscan with some key-skipping behvaior that only distinct uses. STAGE_DISTINCT, + // Dummy stage used for receiving notifications of deletions during chunk migration. + STAGE_NOTIFY_DELETE, + STAGE_EOF, // This is more of an "internal-only" stage where we try to keep docs that were mutated @@ -64,10 +67,18 @@ namespace mongo { STAGE_IXSCAN, STAGE_LIMIT, STAGE_MOCK, + + // Implements parallelCollectionScan. + STAGE_MULTI_ITERATOR, + STAGE_MULTI_PLAN, STAGE_OPLOG_START, STAGE_OR, STAGE_PROJECTION, + + // Stage for running aggregation pipelines. + STAGE_PIPELINE_PROXY, + STAGE_SHARDING_FILTER, STAGE_SKIP, STAGE_SORT, diff --git a/src/mongo/db/query/subplan_runner.cpp b/src/mongo/db/query/subplan_runner.cpp index 4682c29b2b4..fdd212db36a 100644 --- a/src/mongo/db/query/subplan_runner.cpp +++ b/src/mongo/db/query/subplan_runner.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #include "mongo/db/query/subplan_runner.h" #include "mongo/client/dbclientinterface.h" diff --git a/src/mongo/db/query/subplan_runner.h b/src/mongo/db/query/subplan_runner.h index 896772450ad..64f0b29aecb 100644 --- a/src/mongo/db/query/subplan_runner.h +++ b/src/mongo/db/query/subplan_runner.h @@ -26,6 +26,8 @@ * it in the license file. */ +// THIS FILE IS DEPRECATED -- Runner to be replaced with PlanExecutor. + #pragma once #include <boost/scoped_ptr.hpp> diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 3ecf77b607a..27e446765bb 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -253,13 +253,13 @@ namespace repl { // check that no items are in sources other than that // add if missing int n = 0; - auto_ptr<Runner> runner( + auto_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, localSources, ctx.db()->getCollection(txn, localSources))); BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { n++; ReplSource tmp(txn, obj); if (tmp.hostName != replSettings.source) { @@ -298,13 +298,13 @@ namespace repl { } } - auto_ptr<Runner> runner( + auto_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, localSources, ctx.db()->getCollection(txn, localSources))); BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { ReplSource tmp(txn, obj); if ( tmp.syncedTo.isNull() ) { DBDirectClient c(txn); diff --git a/src/mongo/db/repl/repl_info.cpp b/src/mongo/db/repl/repl_info.cpp index c05de79df5d..9d3b1e02436 100644 --- a/src/mongo/db/repl/repl_info.cpp +++ b/src/mongo/db/repl/repl_info.cpp @@ -83,14 +83,14 @@ namespace repl { { const char* localSources = "local.sources"; Client::ReadContext ctx(txn, localSources); - auto_ptr<Runner> runner( + auto_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, localSources, ctx.ctx().db()->getCollection(txn, localSources))); BSONObj obj; Runner::RunnerState state; - while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + while (Runner::RUNNER_ADVANCED == (state = exec->getNext(&obj, NULL))) { src.push_back(obj); } } diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index b65dbb324bb..1ca35ae1e83 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -234,7 +234,7 @@ namespace repl { static void syncRollbackFindCommonPoint(OperationContext* txn, DBClientConnection* them, FixUpInfo& fixUpInfo) { Client::Context ctx(txn, rsoplog); - boost::scoped_ptr<Runner> runner( + boost::scoped_ptr<PlanExecutor> exec( InternalPlanner::collectionScan(txn, rsoplog, ctx.db()->getCollection(txn, rsoplog), @@ -243,7 +243,7 @@ namespace repl { BSONObj ourObj; DiskLoc ourLoc; - if (Runner::RUNNER_ADVANCED != runner->getNext(&ourObj, &ourLoc)) { + if (Runner::RUNNER_ADVANCED != exec->getNext(&ourObj, &ourLoc)) { throw RSFatalException("our oplog empty or unreadable"); } @@ -304,7 +304,7 @@ namespace repl { theirObj = oplogCursor->nextSafe(); theirTime = theirObj["ts"]._opTime(); - if (Runner::RUNNER_ADVANCED != runner->getNext(&ourObj, &ourLoc)) { + if (Runner::RUNNER_ADVANCED != exec->getNext(&ourObj, &ourLoc)) { log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " @@ -331,7 +331,7 @@ namespace repl { else { // theirTime < ourTime refetch(fixUpInfo, ourObj); - if (Runner::RUNNER_ADVANCED != runner->getNext(&ourObj, &ourLoc)) { + if (Runner::RUNNER_ADVANCED != exec->getNext(&ourObj, &ourLoc)) { log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 1c37fa6db4c..363925b0719 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -38,7 +38,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/query/get_runner.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/storage_options.h" #include "mongo/db/operation_context_impl.h" #include "mongo/dbtests/dbtests.h" @@ -172,27 +172,28 @@ namespace DocumentSourceTests { // clean up first if this was called before _source.reset(); _registration.reset(); - _runner.reset(); + _exec.reset(); Client::WriteContext ctx(&_opCtx, ns); CanonicalQuery* cq; uassertStatusOK(CanonicalQuery::canonicalize(ns, /*query=*/BSONObj(), &cq)); - Runner* runnerBare; - uassertStatusOK(getRunner(&_opCtx, ctx.ctx().db()->getCollection(&_opCtx, ns), cq, &runnerBare)); + PlanExecutor* execBare; + uassertStatusOK(getExecutor(&_opCtx, ctx.ctx().db()->getCollection(&_opCtx, ns), + cq, &execBare)); - _runner.reset(runnerBare); - _runner->saveState(); - _registration.reset(new ScopedRunnerRegistration(_runner.get())); + _exec.reset(execBare); + _exec->saveState(); + _registration.reset(new ScopedExecutorRegistration(_exec.get())); - _source = DocumentSourceCursor::create(ns, _runner, _ctx); + _source = DocumentSourceCursor::create(ns, _exec, _ctx); } intrusive_ptr<ExpressionContext> ctx() { return _ctx; } DocumentSourceCursor* source() { return _source.get(); } private: // It is important that these are ordered to ensure correct destruction order. - boost::shared_ptr<Runner> _runner; - boost::scoped_ptr<ScopedRunnerRegistration> _registration; + boost::shared_ptr<PlanExecutor> _exec; + boost::scoped_ptr<ScopedExecutorRegistration> _registration; intrusive_ptr<ExpressionContext> _ctx; intrusive_ptr<DocumentSourceCursor> _source; }; diff --git a/src/mongo/dbtests/runner_registry.cpp b/src/mongo/dbtests/executor_registry.cpp index 970027d9450..d122e356b74 100644 --- a/src/mongo/dbtests/runner_registry.cpp +++ b/src/mongo/dbtests/executor_registry.cpp @@ -27,8 +27,8 @@ */ /** - * This file tests Runner forced yielding, ClientCursor::registerRunner, and - * ClientCursor::deregisterRunner. + * This file tests PlanExecutor forced yielding, ClientCursor::registerExecutor, and + * ClientCursor::deregisterExecutor. */ #include "mongo/client/dbclientcursor.h" @@ -45,11 +45,11 @@ #include "mongo/dbtests/dbtests.h" -namespace RunnerRegistry { +namespace ExecutorRegistry { - class RunnerRegistryBase { + class ExecutorRegistryBase { public: - RunnerRegistryBase() + ExecutorRegistryBase() : _client(&_opCtx) { _ctx.reset(new Client::WriteContext(&_opCtx, ns())); @@ -60,16 +60,16 @@ namespace RunnerRegistry { } } - ~RunnerRegistryBase() { + ~ExecutorRegistryBase() { if (_ctx.get()) { _ctx->commit(); } } /** - * Return a runner that is going over the collection in ns(). + * Return a plan executor that is going over the collection in ns(). */ - Runner* getCollscan() { + PlanExecutor* getCollscan() { auto_ptr<WorkingSet> ws(new WorkingSet()); CollectionScanParams params; params.collection = collection(); @@ -77,22 +77,20 @@ namespace RunnerRegistry { params.tailable = false; auto_ptr<CollectionScan> scan(new CollectionScan(&_opCtx, params, ws.get(), NULL)); - // Create a runner to hold it + // Create a plan executor to hold it CanonicalQuery* cq; ASSERT(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); // Owns all args - auto_ptr<Runner> run(new SingleSolutionRunner(_ctx->ctx().db()->getCollection( &_opCtx, - ns() ), - cq, NULL, scan.release(), ws.release())); - return run.release(); + return new PlanExecutor(ws.release(), scan.release(), cq, + _ctx->ctx().db()->getCollection( &_opCtx, ns() )); } - void registerRunner( Runner* runner ) { - _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->registerRunner( runner ); + void registerExecutor( PlanExecutor* exec ) { + _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->registerExecutor( exec ); } - void deregisterRunner( Runner* runner ) { - _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->deregisterRunner( runner ); + void deregisterExecutor( PlanExecutor* exec ) { + _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->deregisterExecutor( exec ); } int N() { return 50; } @@ -101,7 +99,7 @@ namespace RunnerRegistry { return _ctx->ctx().db()->getCollection( &_opCtx, ns() ); } - static const char* ns() { return "unittests.RunnerRegistryDiskLocInvalidation"; } + static const char* ns() { return "unittests.ExecutorRegistryDiskLocInvalidation"; } // Order of these is important for initialization OperationContextImpl _opCtx; @@ -111,10 +109,10 @@ namespace RunnerRegistry { // Test that a registered runner receives invalidation notifications. - class RunnerRegistryDiskLocInvalid : public RunnerRegistryBase { + class ExecutorRegistryDiskLocInvalid : public ExecutorRegistryBase { public: void run() { - auto_ptr<Runner> run(getCollscan()); + auto_ptr<PlanExecutor> run(getCollscan()); BSONObj obj; // Read some of it. @@ -125,7 +123,7 @@ namespace RunnerRegistry { // Register it. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // At this point it's safe to yield. forceYield would do that. Let's now simulate some // stuff going on in the yield. @@ -136,7 +134,7 @@ namespace RunnerRegistry { // At this point, we're done yielding. We recover our lock. // Unregister the runner. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); // And clean up anything that happened before. run->restoreState(&_opCtx); @@ -153,10 +151,10 @@ namespace RunnerRegistry { }; // Test that registered runners are killed when their collection is dropped. - class RunnerRegistryDropCollection : public RunnerRegistryBase { + class ExecutorRegistryDropCollection : public ExecutorRegistryBase { public: void run() { - auto_ptr<Runner> run(getCollscan()); + auto_ptr<PlanExecutor> run(getCollscan()); BSONObj obj; // Read some of it. @@ -167,13 +165,13 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // Drop a collection that's not ours. _client.dropCollection("unittests.someboguscollection"); // Unregister and restore state. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); run->restoreState(&_opCtx); ASSERT_EQUALS(Runner::RUNNER_ADVANCED, run->getNext(&obj, NULL)); @@ -181,25 +179,25 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // Drop our collection. _client.dropCollection(ns()); // Unregister and restore state. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); run->restoreState(&_opCtx); - // Runner was killed. + // PlanExecutor was killed. ASSERT_EQUALS(Runner::RUNNER_DEAD, run->getNext(&obj, NULL)); } }; // Test that registered runners are killed when all indices are dropped on the collection. - class RunnerRegistryDropAllIndices : public RunnerRegistryBase { + class ExecutorRegistryDropAllIndices : public ExecutorRegistryBase { public: void run() { - auto_ptr<Runner> run(getCollscan()); + auto_ptr<PlanExecutor> run(getCollscan()); BSONObj obj; _client.ensureIndex(ns(), BSON("foo" << 1)); @@ -212,25 +210,25 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // Drop all indices. _client.dropIndexes(ns()); // Unregister and restore state. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); run->restoreState(&_opCtx); - // Runner was killed. + // PlanExecutor was killed. ASSERT_EQUALS(Runner::RUNNER_DEAD, run->getNext(&obj, NULL)); } }; // Test that registered runners are killed when an index is dropped on the collection. - class RunnerRegistryDropOneIndex : public RunnerRegistryBase { + class ExecutorRegistryDropOneIndex : public ExecutorRegistryBase { public: void run() { - auto_ptr<Runner> run(getCollscan()); + auto_ptr<PlanExecutor> run(getCollscan()); BSONObj obj; _client.ensureIndex(ns(), BSON("foo" << 1)); @@ -243,25 +241,25 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // Drop a specific index. _client.dropIndex(ns(), BSON("foo" << 1)); // Unregister and restore state. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); run->restoreState(&_opCtx); - // Runner was killed. + // PlanExecutor was killed. ASSERT_EQUALS(Runner::RUNNER_DEAD, run->getNext(&obj, NULL)); } }; // Test that registered runners are killed when their database is dropped. - class RunnerRegistryDropDatabase : public RunnerRegistryBase { + class ExecutorRegistryDropDatabase : public ExecutorRegistryBase { public: void run() { - auto_ptr<Runner> run(getCollscan()); + auto_ptr<PlanExecutor> run(getCollscan()); BSONObj obj; // Read some of it. @@ -272,7 +270,7 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB // requires a "global write lock." @@ -282,7 +280,7 @@ namespace RunnerRegistry { _ctx.reset(new Client::WriteContext(&_opCtx, ns())); // Unregister and restore state. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); run->restoreState(&_opCtx); ASSERT_EQUALS(Runner::RUNNER_ADVANCED, run->getNext(&obj, NULL)); @@ -290,7 +288,7 @@ namespace RunnerRegistry { // Save state and register. run->saveState(); - registerRunner(run.get()); + registerExecutor(run.get()); // Drop our DB. Once again, must give up the lock. _ctx->commit(); @@ -299,12 +297,12 @@ namespace RunnerRegistry { _ctx.reset(new Client::WriteContext(&_opCtx, ns())); // Unregister and restore state. - deregisterRunner(run.get()); + deregisterExecutor(run.get()); run->restoreState(&_opCtx); _ctx->commit(); _ctx.reset(); - // Runner was killed. + // PlanExecutor was killed. ASSERT_EQUALS(Runner::RUNNER_DEAD, run->getNext(&obj, NULL)); } }; @@ -313,15 +311,15 @@ namespace RunnerRegistry { class All : public Suite { public: - All() : Suite( "runner_registry" ) { } + All() : Suite( "executor_registry" ) { } void setupTests() { - add<RunnerRegistryDiskLocInvalid>(); - add<RunnerRegistryDropCollection>(); - add<RunnerRegistryDropAllIndices>(); - add<RunnerRegistryDropOneIndex>(); - add<RunnerRegistryDropDatabase>(); + add<ExecutorRegistryDiskLocInvalid>(); + add<ExecutorRegistryDropCollection>(); + add<ExecutorRegistryDropAllIndices>(); + add<ExecutorRegistryDropOneIndex>(); + add<ExecutorRegistryDropDatabase>(); } - } runnerRegistryAll; + } executorRegistryAll; -} // namespace RunnerRegistry +} // namespace ExecutorRegistry diff --git a/src/mongo/dbtests/oplogstarttests.cpp b/src/mongo/dbtests/oplogstarttests.cpp index 0c6b5c0f20b..7a8995aba0c 100644 --- a/src/mongo/dbtests/oplogstarttests.cpp +++ b/src/mongo/dbtests/oplogstarttests.cpp @@ -15,10 +15,9 @@ */ /** - * This file tests db/exec/oplogstart.{h,cpp}. OplogStart is a planner stage - * used by an InternalRunner. It is responsible for walking the oplog - * backwards in order to find where the oplog should be replayed from for - * replication. + * This file tests db/exec/oplogstart.{h,cpp}. OplogStart is an execution stage + * responsible for walking the oplog backwards in order to find where the oplog should + * be replayed from for replication. */ #include "mongo/dbtests/dbtests.h" @@ -27,7 +26,6 @@ #include "mongo/db/exec/oplogstart.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/query/canonical_query.h" -#include "mongo/db/query/internal_runner.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/operation_context_impl.h" diff --git a/src/mongo/dbtests/query_single_solution_runner.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 4f4ddd09570..477f5db80f6 100644 --- a/src/mongo/dbtests/query_single_solution_runner.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -37,19 +37,19 @@ #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_solution.h" -#include "mongo/db/query/single_solution_runner.h" #include "mongo/dbtests/dbtests.h" -namespace QuerySingleSolutionRunner { +namespace QueryPlanExecutor { - class SingleSolutionRunnerBase { + class PlanExecutorBase { public: - SingleSolutionRunnerBase() : _client(&_txn) { - + PlanExecutorBase() : _client(&_txn) { + } - virtual ~SingleSolutionRunnerBase() { + virtual ~PlanExecutorBase() { _client.dropCollection(ns()); } @@ -75,13 +75,12 @@ namespace QuerySingleSolutionRunner { /** * Given a match expression, represented as the BSON object 'filterObj', - * create a SingleSolutionRunner capable of executing a simple collection + * create a PlanExecutor capable of executing a simple collection * scan. * - * The caller takes ownership of the returned SingleSolutionRunner*. + * The caller takes ownership of the returned PlanExecutor*. */ - SingleSolutionRunner* makeCollScanRunner(Client::Context& ctx, - BSONObj& filterObj) { + PlanExecutor* makeCollScanExec(Client::Context& ctx, BSONObj& filterObj) { CollectionScanParams csparams; csparams.collection = ctx.db()->getCollection( &_txn, ns() ); csparams.direction = CollectionScanParams::FORWARD; @@ -97,13 +96,10 @@ namespace QuerySingleSolutionRunner { verify(CanonicalQuery::canonicalize(ns(), filterObj, &cq).isOK()); verify(NULL != cq); - // Hand the plan off to the single solution runner. - SingleSolutionRunner* ssr = new SingleSolutionRunner(ctx.db()->getCollection(&_txn, ns()), - cq, - new QuerySolution(), - root.release(), - ws.release()); - return ssr; + // Hand the plan off to the executor. + PlanExecutor* exec = new PlanExecutor(ws.release(), root.release(), cq, + ctx.db()->getCollection(&_txn, ns())); + return exec; } /** @@ -114,13 +110,13 @@ namespace QuerySingleSolutionRunner { * @param end -- the lower bound (inclusive) at which to end the * index scan * - * Returns a SingleSolutionRunner capable of executing an index scan + * Returns a PlanExecutor capable of executing an index scan * over the specified index with the specified bounds. * - * The caller takes ownership of the returned SingleSolutionRunner*. + * The caller takes ownership of the returned PlanExecutor*. */ - SingleSolutionRunner* makeIndexScanRunner(Client::Context& context, - BSONObj& indexSpec, int start, int end) { + PlanExecutor* makeIndexScanExec(Client::Context& context, + BSONObj& indexSpec, int start, int end) { // Build the index scan stage. IndexScanParams ixparams; ixparams.descriptor = getIndex(context.db(), indexSpec); @@ -140,13 +136,11 @@ namespace QuerySingleSolutionRunner { verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); verify(NULL != cq); - // Hand the plan off to the single solution runner. - return new SingleSolutionRunner(coll, - cq, new QuerySolution(), - root.release(), ws.release()); + // Hand the plan off to the executor. + return new PlanExecutor(ws.release(), root.release(), cq, coll); } - static const char* ns() { return "unittests.QueryStageSingleSolutionRunner"; } + static const char* ns() { return "unittests.QueryPlanExecutor"; } size_t numCursors() { Client::ReadContext ctx(&_txn, ns() ); @@ -156,16 +150,16 @@ namespace QuerySingleSolutionRunner { return collection->cursorCache()->numCursors(); } - void registerRunner( Runner* runner ) { + void registerExec( PlanExecutor* exec ) { Client::ReadContext ctx(&_txn, ns()); Collection* collection = ctx.ctx().db()->getOrCreateCollection( &_txn, ns() ); - return collection->cursorCache()->registerRunner( runner ); + collection->cursorCache()->registerExecutor( exec ); } - void deregisterRunner( Runner* runner ) { + void deregisterExec( PlanExecutor* exec ) { Client::ReadContext ctx(&_txn, ns()); Collection* collection = ctx.ctx().db()->getOrCreateCollection( &_txn, ns() ); - return collection->cursorCache()->deregisterRunner( runner ); + collection->cursorCache()->deregisterExecutor( exec ); } protected: @@ -182,9 +176,9 @@ namespace QuerySingleSolutionRunner { /** * Test dropping the collection while the - * SingleSolutionRunner is doing a collection scan. + * PlanExecutor is doing a collection scan. */ - class DropCollScan : public SingleSolutionRunnerBase { + class DropCollScan : public PlanExecutorBase { public: void run() { Client::WriteContext ctx(&_txn, ns()); @@ -192,19 +186,19 @@ namespace QuerySingleSolutionRunner { insert(BSON("_id" << 2)); BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); - scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(ctx.ctx(),filterObj)); - registerRunner(ssr.get()); + scoped_ptr<PlanExecutor> exec(makeCollScanExec(ctx.ctx(),filterObj)); + registerExec(exec.get()); BSONObj objOut; - ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(1, objOut["_id"].numberInt()); // After dropping the collection, the runner // should be dead. dropCollection(); - ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_DEAD, exec->getNext(&objOut, NULL)); - deregisterRunner(ssr.get()); + deregisterExec(exec.get()); ctx.commit(); } }; @@ -213,7 +207,7 @@ namespace QuerySingleSolutionRunner { * Test dropping the collection while the * SingleSolutionRunner is doing an index scan. */ - class DropIndexScan : public SingleSolutionRunnerBase { + class DropIndexScan : public PlanExecutorBase { public: void run() { Client::WriteContext ctx(&_txn, ns()); @@ -223,24 +217,24 @@ namespace QuerySingleSolutionRunner { BSONObj indexSpec = BSON("a" << 1); addIndex(indexSpec); - scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(ctx.ctx(), indexSpec, 7, 10)); - registerRunner(ssr.get()); + scoped_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.ctx(), indexSpec, 7, 10)); + registerExec(exec.get()); BSONObj objOut; - ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(7, objOut["a"].numberInt()); // After dropping the collection, the runner // should be dead. dropCollection(); - ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_DEAD, exec->getNext(&objOut, NULL)); - deregisterRunner(ssr.get()); + deregisterExec(exec.get()); ctx.commit(); } }; - class SnapshotBase : public SingleSolutionRunnerBase { + class SnapshotBase : public PlanExecutorBase { protected: void setupCollection() { insert(BSON("_id" << 1 << "a" << 1)); @@ -265,15 +259,15 @@ namespace QuerySingleSolutionRunner { } /** - * Given an array of ints, 'expectedIds', and a SingleSolutionRunner, - * 'ssr', uses the runner to iterate through the collection. While + * Given an array of ints, 'expectedIds', and a PlanExecutor, + * 'exec', uses the executor to iterate through the collection. While * iterating, asserts that the _id of each successive document equals * the respective integer in 'expectedIds'. */ - void checkIds(int* expectedIds, SingleSolutionRunner* ssr) { + void checkIds(int* expectedIds, PlanExecutor* exec) { BSONObj objOut; int idcount = 0; - while (Runner::RUNNER_ADVANCED == ssr->getNext(&objOut, NULL)) { + while (Runner::RUNNER_ADVANCED == exec->getNext(&objOut, NULL)) { ASSERT_EQUALS(expectedIds[idcount], objOut["_id"].numberInt()); ++idcount; } @@ -292,16 +286,16 @@ namespace QuerySingleSolutionRunner { setupCollection(); BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - scoped_ptr<SingleSolutionRunner> ssr(makeCollScanRunner(ctx.ctx(),filterObj)); + scoped_ptr<PlanExecutor> exec(makeCollScanExec(ctx.ctx(),filterObj)); BSONObj objOut; - ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(2, objOut["a"].numberInt()); forceDocumentMove(); int ids[] = {3, 4, 2}; - checkIds(ids, ssr.get()); + checkIds(ids, exec.get()); ctx.commit(); } }; @@ -320,10 +314,10 @@ namespace QuerySingleSolutionRunner { addIndex(indexSpec); BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - scoped_ptr<SingleSolutionRunner> ssr(makeIndexScanRunner(ctx.ctx(), indexSpec, 2, 5)); + scoped_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.ctx(), indexSpec, 2, 5)); BSONObj objOut; - ASSERT_EQUALS(Runner::RUNNER_ADVANCED, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(2, objOut["a"].numberInt()); forceDocumentMove(); @@ -331,7 +325,7 @@ namespace QuerySingleSolutionRunner { // Since this time we're scanning the _id index, // we should not see the moved document again. int ids[] = {3, 4}; - checkIds(ids, ssr.get()); + checkIds(ids, exec.get()); ctx.commit(); } }; @@ -343,18 +337,18 @@ namespace QuerySingleSolutionRunner { /** * Test invalidation of ClientCursor. */ - class Invalidate : public SingleSolutionRunnerBase { + class Invalidate : public PlanExecutorBase { public: void run() { Client::WriteContext ctx(&_txn, ns()); insert(BSON("a" << 1 << "b" << 1)); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj); + PlanExecutor* exec = makeCollScanExec(ctx.ctx(),filterObj); // Make a client cursor from the runner. new ClientCursor(ctx.ctx().db()->getCollection(&_txn, ns()), - ssr, 0, BSONObj()); + exec, 0, BSONObj()); // There should be one cursor before invalidation, // and zero cursors after invalidation. @@ -369,7 +363,7 @@ namespace QuerySingleSolutionRunner { * Test that pinned client cursors persist even after * invalidation. */ - class InvalidatePinned : public SingleSolutionRunnerBase { + class InvalidatePinned : public PlanExecutorBase { public: void run() { Client::WriteContext ctx(&_txn, ns()); @@ -378,11 +372,11 @@ namespace QuerySingleSolutionRunner { Collection* collection = ctx.ctx().db()->getCollection(&_txn, ns()); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj); + PlanExecutor* exec = makeCollScanExec(ctx.ctx(),filterObj); // Make a client cursor from the runner. ClientCursor* cc = new ClientCursor(collection, - ssr, 0, BSONObj()); + exec, 0, BSONObj()); ClientCursorPin ccPin(collection,cc->cursorid()); // If the cursor is pinned, it sticks around, @@ -393,7 +387,7 @@ namespace QuerySingleSolutionRunner { // The invalidation should have killed the runner. BSONObj objOut; - ASSERT_EQUALS(Runner::RUNNER_DEAD, ssr->getNext(&objOut, NULL)); + ASSERT_EQUALS(Runner::RUNNER_DEAD, exec->getNext(&objOut, NULL)); // Deleting the underlying cursor should cause the // number of cursors to return to 0. @@ -407,7 +401,7 @@ namespace QuerySingleSolutionRunner { * Test that client cursors time out and get * deleted. */ - class Timeout : public SingleSolutionRunnerBase { + class Timeout : public PlanExecutorBase { public: void run() { { @@ -421,10 +415,10 @@ namespace QuerySingleSolutionRunner { Collection* collection = ctx.ctx().db()->getCollection(&_txn, ns()); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - SingleSolutionRunner* ssr = makeCollScanRunner(ctx.ctx(),filterObj); + PlanExecutor* exec = makeCollScanExec(ctx.ctx(),filterObj); // Make a client cursor from the runner. - new ClientCursor(collection, ssr, 0, BSONObj()); + new ClientCursor(collection, exec, 0, BSONObj()); } // There should be one cursor before timeout, @@ -439,7 +433,7 @@ namespace QuerySingleSolutionRunner { class All : public Suite { public: - All() : Suite( "query_single_solution_runner" ) { } + All() : Suite( "query_plan_executor" ) { } void setupTests() { add<DropCollScan>(); @@ -450,6 +444,6 @@ namespace QuerySingleSolutionRunner { add<ClientCursor::InvalidatePinned>(); add<ClientCursor::Timeout>(); } - } queryMultiPlanRunnerAll; + } queryPlanExecutorAll; -} // namespace QuerySingleSolutionRunner +} // namespace QueryPlanExecutor diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp index 2f7fa19bea5..36e292b8c4f 100644 --- a/src/mongo/dbtests/query_stage_and.cpp +++ b/src/mongo/dbtests/query_stage_and.cpp @@ -186,7 +186,7 @@ namespace QueryStageAnd { } } size_t memUsageAfter = ah->getMemUsage(); - ah->recoverFromYield(); + ah->recoverFromYield(&_txn); // Invalidating a read object should decrease memory usage. ASSERT_LESS_THAN(memUsageAfter, memUsageBefore); @@ -291,7 +291,7 @@ namespace QueryStageAnd { // Look ahead results do not count towards memory usage. ASSERT_EQUALS(memUsageBefore, memUsageAfter); - ah->recoverFromYield(); + ah->recoverFromYield(&_txn); // The deleted obj should show up in flagged. ASSERT_EQUALS(size_t(1), flagged.size()); @@ -802,7 +802,7 @@ namespace QueryStageAnd { ah->prepareToYield(); ah->invalidate(*data.begin(), INVALIDATION_DELETION); remove(coll->docFor(*data.begin())); - ah->recoverFromYield(); + ah->recoverFromYield(&_txn); // Make sure the nuked obj is actually in the flagged data. ASSERT_EQUALS(ws.getFlagged().size(), size_t(1)); @@ -841,7 +841,7 @@ namespace QueryStageAnd { ah->prepareToYield(); ah->invalidate(*it, INVALIDATION_DELETION); remove(coll->docFor(*it)); - ah->recoverFromYield(); + ah->recoverFromYield(&_txn); // Get all results aside from the two we killed. while (!ah->isEOF()) { diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index 06428a98900..797d0af3d28 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -281,7 +281,7 @@ namespace QueryStageCollectionScan { scan->prepareToYield(); scan->invalidate(locs[count], INVALIDATION_DELETION); remove(coll->docFor(locs[count])); - scan->recoverFromYield(); + scan->recoverFromYield(&_txn); // Skip over locs[count]. ++count; @@ -343,7 +343,7 @@ namespace QueryStageCollectionScan { scan->prepareToYield(); scan->invalidate(locs[count], INVALIDATION_DELETION); remove(coll->docFor(locs[count])); - scan->recoverFromYield(); + scan->recoverFromYield(&_txn); // Skip over locs[count]. ++count; diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp index 6a7a039008e..d457342c12f 100644 --- a/src/mongo/dbtests/query_stage_count.cpp +++ b/src/mongo/dbtests/query_stage_count.cpp @@ -333,7 +333,7 @@ namespace QueryStageCount { count.prepareToYield(); // Recover from yield - count.recoverFromYield(); + count.recoverFromYield(&_txn); // finish counting while (PlanStage::IS_EOF != countState) { @@ -388,7 +388,7 @@ namespace QueryStageCount { remove(BSON("a" << GTE << 5)); // Recover from yield - count.recoverFromYield(); + count.recoverFromYield(&_txn); // finish counting while (PlanStage::IS_EOF != countState) { @@ -446,7 +446,7 @@ namespace QueryStageCount { insert(BSON("a" << 6.5)); // Recover from yield - count.recoverFromYield(); + count.recoverFromYield(&_txn); // finish counting while (PlanStage::IS_EOF != countState) { @@ -501,7 +501,7 @@ namespace QueryStageCount { insert(BSON("a" << BSON_ARRAY(10 << 11))); // Recover from yield - count.recoverFromYield(); + count.recoverFromYield(&_txn); // finish counting while (PlanStage::IS_EOF != countState) { @@ -625,7 +625,7 @@ namespace QueryStageCount { remove(BSON("a" << 1 << "b" << 5)); // Recover from yield - count.recoverFromYield(); + count.recoverFromYield(&_txn); // finish counting while (PlanStage::IS_EOF != countState) { diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index 0a1492c4b0d..1358315a5ca 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -547,7 +547,7 @@ namespace QueryStageMergeSortTests { // Invalidate locs[11]. Should force a fetch. We don't get it back. ms->prepareToYield(); ms->invalidate(*it, INVALIDATION_DELETION); - ms->recoverFromYield(); + ms->recoverFromYield(&_txn); // Make sure locs[11] was fetched for us. { diff --git a/src/mongo/dbtests/query_stage_near.cpp b/src/mongo/dbtests/query_stage_near.cpp index c4939d16e75..ae31c1e6249 100644 --- a/src/mongo/dbtests/query_stage_near.cpp +++ b/src/mongo/dbtests/query_stage_near.cpp @@ -85,7 +85,7 @@ namespace { virtual void prepareToYield() { } - virtual void recoverFromYield() { + virtual void recoverFromYield(OperationContext* opCtx) { } virtual void invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index 81e500cb238..fe04b977f16 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -289,7 +289,7 @@ namespace QueryStageSortTests { ss->prepareToYield(); set<DiskLoc>::iterator it = locs.begin(); ss->invalidate(*it++, INVALIDATION_DELETION); - ss->recoverFromYield(); + ss->recoverFromYield(&_txn); // Read the rest of the data from the mock stage. while (!ms->isEOF()) { @@ -305,7 +305,7 @@ namespace QueryStageSortTests { while (it != locs.end()) { ss->invalidate(*it++, INVALIDATION_DELETION); } - ss->recoverFromYield(); + ss->recoverFromYield(&_txn); // Invalidation of data in the sort stage fetches it but passes it through. int count = 0; diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 591cb3162fd..5c7e1061dbe 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -53,6 +53,7 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/plan_stage.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/field_parser.h" #include "mongo/db/hasher.h" @@ -263,7 +264,7 @@ namespace mongo { "section" << endl; - _dummyRunner.reset( NULL ); + _deleteNotifyExec.reset( NULL ); Lock::GlobalWrite lk(txn->lockState()); log() << "MigrateFromStatus::done Global lock acquired" << endl; @@ -423,8 +424,13 @@ namespace mongo { return false; } - invariant( _dummyRunner.get() == NULL ); - _dummyRunner.reset(new DummyRunner(txn, _ns, collection)); + invariant( _deleteNotifyExec.get() == NULL ); + WorkingSet* ws = new WorkingSet(); + DeleteNotificationStage* dns = new DeleteNotificationStage(); + // Takes ownership of 'ws' and 'dns'. + PlanExecutor* deleteNotifyExec = new PlanExecutor(ws, dns, collection); + deleteNotifyExec->registerExecInternalPlan(); + _deleteNotifyExec.reset(deleteNotifyExec); // Allow multiKey based on the invariant that shard keys must be single-valued. // Therefore, any multi-key index prefixed by shard key cannot be multikey over @@ -442,7 +448,8 @@ namespace mongo { BSONObj min = Helpers::toKeyFormat( kp.extendRangeBound( _min, false ) ); BSONObj max = Helpers::toKeyFormat( kp.extendRangeBound( _max, false ) ); - auto_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, idx, min, max, false)); + auto_ptr<PlanExecutor> exec( + InternalPlanner::indexScan(txn, collection, idx, min, max, false)); // use the average object size to estimate how many objects a full chunk would carry // do that while traversing the chunk's range using the sharding index, below @@ -465,7 +472,7 @@ namespace mongo { bool isLargeChunk = false; unsigned long long recCount = 0;; DiskLoc dl; - while (Runner::RUNNER_ADVANCED == runner->getNext(NULL, &dl)) { + while (Runner::RUNNER_ADVANCED == exec->getNext(NULL, &dl)) { if ( ! isLargeChunk ) { scoped_spinlock lk( _trackerLocks ); _cloneLocs.insert( dl ); @@ -475,7 +482,7 @@ namespace mongo { isLargeChunk = true; } } - runner.reset(); + exec.reset(); if ( isLargeChunk ) { warning() << "cannot move chunk: the maximum number of documents for a chunk is " @@ -628,26 +635,16 @@ namespace mongo { bool _getActive() const { scoped_lock l(_mutex); return _active; } void _setActive( bool b ) { scoped_lock l(_mutex); _active = b; } - - class DummyRunner : public Runner { + /** + * Used to receive invalidation notifications. + * + * XXX: move to the exec/ directory. + */ + class DeleteNotificationStage : public PlanStage { public: - DummyRunner(OperationContext* txn, - const StringData& ns, - Collection* collection ) { - _ns = ns.toString(); - _txn = txn; - _collection = collection; - _collection->cursorCache()->registerRunner( this ); - } - ~DummyRunner() { - if ( !_collection ) - return; - Client::ReadContext ctx(_txn, _ns); - Collection* collection = ctx.ctx().db()->getCollection( _txn, _ns ); - invariant( _collection == collection ); - _collection->cursorCache()->deregisterRunner( this ); - } - virtual RunnerState getNext(BSONObj* objOut, DiskLoc* dlOut) { + virtual void invalidate(const DiskLoc& dl, InvalidationType type); + + virtual StageState work(WorkingSetID* out) { invariant( false ); } virtual bool isEOF() { @@ -655,38 +652,42 @@ namespace mongo { return false; } virtual void kill() { - _collection = NULL; } - virtual void saveState() { + virtual void prepareToYield() { invariant( false ); } - virtual bool restoreState(OperationContext* opCtx) { + virtual void recoverFromYield(OperationContext* opCtx) { invariant( false ); } - virtual const string& ns() { + virtual PlanStageStats* getStats() { invariant( false ); - return _ns; + return NULL; } - virtual void invalidate(const DiskLoc& dl, InvalidationType type); - virtual const Collection* collection() { - return _collection; + virtual CommonStats* getCommonStats() { + invariant( false ); + return NULL; } - virtual Status getInfo(TypeExplain** explain, PlanInfo** planInfo) const { - return Status( ErrorCodes::InternalError, "no" ); + virtual SpecificStats* getSpecificStats() { + invariant( false ); + return NULL; + } + virtual std::vector<PlanStage*> getChildren() const { + invariant( false ); + vector<PlanStage*> empty; + return empty; + } + virtual StageType stageType() const { + invariant( false ); + return STAGE_NOTIFY_DELETE; } - - private: - string _ns; - OperationContext* _txn; - Collection* _collection; }; - scoped_ptr<DummyRunner> _dummyRunner; + scoped_ptr<PlanExecutor> _deleteNotifyExec; } migrateFromStatus; - void MigrateFromStatus::DummyRunner::invalidate(const DiskLoc& dl, - InvalidationType type) { + void MigrateFromStatus::DeleteNotificationStage::invalidate(const DiskLoc& dl, + InvalidationType type) { if ( type == INVALIDATION_DELETION ) { migrateFromStatus.aboutToDelete( dl ); } diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index 3841c25b177..3c7a79a207c 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -146,8 +146,9 @@ namespace mongo { max = Helpers::toKeyFormat( kp.extendRangeBound( max, false ) ); } - auto_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, idx, min, max, - false, InternalPlanner::FORWARD)); + auto_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, collection, idx, + min, max, false, + InternalPlanner::FORWARD)); // Find the 'missingField' value used to represent a missing document field in a key of // this index. @@ -163,7 +164,7 @@ namespace mongo { DiskLoc loc; BSONObj currKey; - while (Runner::RUNNER_ADVANCED == runner->getNext(&currKey, &loc)) { + while (Runner::RUNNER_ADVANCED == exec->getNext(&currKey, &loc)) { //check that current key contains non missing elements for all fields in keyPattern BSONObjIterator i( currKey ); for( int k = 0; k < keyPatternLength ; k++ ) { @@ -377,11 +378,12 @@ namespace mongo { long long currCount = 0; long long numChunks = 0; - auto_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, idx, min, max, + auto_ptr<PlanExecutor> exec( + InternalPlanner::indexScan(txn, collection, idx, min, max, false, InternalPlanner::FORWARD)); BSONObj currKey; - Runner::RunnerState state = runner->getNext(&currKey, NULL); + Runner::RunnerState state = exec->getNext(&currKey, NULL); if (Runner::RUNNER_ADVANCED != state) { errmsg = "can't open a cursor for splitting (desired range is possibly empty)"; return false; @@ -419,7 +421,7 @@ namespace mongo { break; } - state = runner->getNext(&currKey, NULL); + state = exec->getNext(&currKey, NULL); } if ( ! forceMedianSplit ) @@ -435,10 +437,10 @@ namespace mongo { currCount = 0; log() << "splitVector doing another cycle because of force, keyCount now: " << keyCount << endl; - runner.reset(InternalPlanner::indexScan(txn, collection, idx, min, max, + exec.reset(InternalPlanner::indexScan(txn, collection, idx, min, max, false, InternalPlanner::FORWARD)); - state = runner->getNext(&currKey, NULL); + state = exec->getNext(&currKey, NULL); } // @@ -879,12 +881,12 @@ namespace mongo { BSONObj newmin = Helpers::toKeyFormat( kp.extendRangeBound( chunk.min, false) ); BSONObj newmax = Helpers::toKeyFormat( kp.extendRangeBound( chunk.max, false) ); - auto_ptr<Runner> runner(InternalPlanner::indexScan(txn, collection, idx, - newmin, newmax, false)); + auto_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, collection, idx, + newmin, newmax, false)); // check if exactly one document found - if (Runner::RUNNER_ADVANCED == runner->getNext(NULL, NULL)) { - if (Runner::RUNNER_EOF == runner->getNext(NULL, NULL)) { + if (Runner::RUNNER_ADVANCED == exec->getNext(NULL, NULL)) { + if (Runner::RUNNER_EOF == exec->getNext(NULL, NULL)) { result.append( "shouldMigrate", BSON("min" << chunk.min << "max" << chunk.max) ); break; |