diff options
Diffstat (limited to 'src')
59 files changed, 1031 insertions, 587 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 9b8bb0ae780..496dbae6cae 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -264,13 +264,8 @@ namespace mongo { _nss(ns), _dblk(opCtx->lockState(), _nss.db(), MODE_IX), _collk(opCtx->lockState(), ns, MODE_IX), - _wunit(opCtx), _c(opCtx, ns) { } - void Client::WriteContext::commit() { - _wunit.commit(); - } - void Client::Context::checkNotStale() const { switch ( _client->_curOp->getOp() ) { case dbGetMore: // getMore's are special and should be handled else where diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 8f82c5d67d3..7fb00403de9 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -183,9 +183,6 @@ namespace mongo { public: WriteContext(OperationContext* opCtx, const std::string& ns); - /** Commit any writes done so far in this context. */ - void commit(); - Database* db() const { return _c.db(); } Collection* getCollection() const { @@ -199,7 +196,6 @@ namespace mongo { NamespaceString _nss; Lock::DBLock _dblk; Lock::CollectionLock _collk; - WriteUnitOfWork _wunit; Context _c; }; diff --git a/src/mongo/db/commands/count.cpp b/src/mongo/db/commands/count.cpp index cb3d6587a16..8ffb02baa13 100644 --- a/src/mongo/db/commands/count.cpp +++ b/src/mongo/db/commands/count.cpp @@ -83,13 +83,16 @@ namespace mongo { Collection* collection = ctx.getCollection(); PlanExecutor* rawExec; - Status getExecStatus = getExecutorCount(txn, collection, request, &rawExec); + Status getExecStatus = getExecutorCount(txn, + collection, + request, + PlanExecutor::YIELD_AUTO, + &rawExec); if (!getExecStatus.isOK()) { return getExecStatus; } scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); return Explain::explainStages(exec.get(), verbosity, out); } @@ -111,13 +114,16 @@ namespace mongo { Collection* collection = ctx.getCollection(); PlanExecutor* rawExec; - Status getExecStatus = getExecutorCount(txn, collection, request, &rawExec); + Status getExecStatus = getExecutorCount(txn, + collection, + request, + PlanExecutor::YIELD_AUTO, + &rawExec); if (!getExecStatus.isOK()) { return appendCommandStatus(result, getExecStatus); } scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); // Store the plan summary string in CurOp. if (NULL != txn->getCurOp()) { @@ -258,7 +264,11 @@ namespace mongo { } PlanExecutor* rawExec; - Status getExecStatus = getExecutorCount(txn, collection, request, &rawExec); + Status getExecStatus = getExecutorCount(txn, + collection, + request, + PlanExecutor::YIELD_AUTO, + &rawExec); if (!getExecStatus.isOK()) { err = getExecStatus.reason(); errCode = getExecStatus.code(); @@ -266,7 +276,6 @@ namespace mongo { } scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); // Store the plan summary string in CurOp. if (NULL != txn->getCurOp()) { diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index 1da82ccc605..16f2db2b2e9 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -110,7 +110,12 @@ namespace mongo { } PlanExecutor* rawExec; - Status status = getExecutorDistinct(txn, collection, query, key, &rawExec); + Status status = getExecutorDistinct(txn, + collection, + query, + key, + PlanExecutor::YIELD_AUTO, + &rawExec); if (!status.isOK()) { uasserted(17216, mongoutils::str::stream() << "Can't get runner for query " << query << ": " << status.toString()); @@ -118,7 +123,6 @@ namespace mongo { } auto_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); BSONObj obj; PlanExecutor::ExecState state; diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 5b9fbdc7c3d..54e20e0f035 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -189,10 +189,14 @@ namespace mongo { PlanExecutor* rawExec; massert(17384, "Could not get plan executor for query " + queryOriginal.toString(), - getExecutor(txn, collection, cq, &rawExec, QueryPlannerParams::DEFAULT).isOK()); + getExecutor(txn, + collection, + cq, + PlanExecutor::YIELD_AUTO, + &rawExec, + QueryPlannerParams::DEFAULT).isOK()); scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); PlanExecutor::ExecState state; if (PlanExecutor::ADVANCED == (state = exec->getNext(&doc, NULL))) { @@ -200,6 +204,8 @@ namespace mongo { } } + WriteUnitOfWork wuow(txn); + BSONObj queryModified = queryOriginal; if ( found && doc["_id"].type() && ! CanonicalQuery::isSimpleIdQuery( queryOriginal ) ) { // we're going to re-write the query to be more efficient @@ -335,7 +341,7 @@ namespace mongo { } } - cx.commit(); + wuow.commit(); return true; } diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 8f39e2624a6..d0194e9d1eb 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -103,7 +103,12 @@ namespace mongo { options |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } - execStatus = getExecutor(txn, collection, cq.release(), &rawExec, options); + execStatus = getExecutor(txn, + collection, + cq.release(), + PlanExecutor::YIELD_AUTO, + &rawExec, + options); } if (!execStatus.isOK()) { @@ -111,7 +116,6 @@ namespace mongo { } scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); // Got the execution tree. Explain it. return Explain::explainStages(exec.get(), verbosity, out); diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp index eb60bbdf09d..e47c0bac76d 100644 --- a/src/mongo/db/commands/geo_near_cmd.cpp +++ b/src/mongo/db/commands/geo_near_cmd.cpp @@ -182,13 +182,12 @@ namespace mongo { } PlanExecutor* rawExec; - if (!getExecutor(txn, collection, cq, &rawExec, 0).isOK()) { + if (!getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, 0).isOK()) { errmsg = "can't get query runner"; return false; } scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); double totalDistance = 0; BSONObjBuilder resultBuilder(result.subarrayStart("results")); diff --git a/src/mongo/db/commands/group.cpp b/src/mongo/db/commands/group.cpp index eb426a2504c..6f1a8a7153d 100644 --- a/src/mongo/db/commands/group.cpp +++ b/src/mongo/db/commands/group.cpp @@ -138,13 +138,16 @@ namespace mongo { Collection* coll = ctx.getCollection(); PlanExecutor *rawPlanExecutor; - Status getExecStatus = getExecutorGroup(txn, coll, groupRequest, &rawPlanExecutor); + Status getExecStatus = getExecutorGroup(txn, + coll, + groupRequest, + PlanExecutor::YIELD_AUTO, + &rawPlanExecutor); if (!getExecStatus.isOK()) { return appendCommandStatus(out, getExecStatus); } scoped_ptr<PlanExecutor> planExecutor(rawPlanExecutor); - planExecutor->setYieldPolicy(PlanExecutor::YIELD_AUTO); // Group executors return ADVANCED exactly once, with the entire group result. BSONObj retval; @@ -192,13 +195,16 @@ namespace mongo { Collection* coll = ctx.getCollection(); PlanExecutor *rawPlanExecutor; - Status getExecStatus = getExecutorGroup(txn, coll, groupRequest, &rawPlanExecutor); + Status getExecStatus = getExecutorGroup(txn, + coll, + groupRequest, + PlanExecutor::YIELD_AUTO, + &rawPlanExecutor); if (!getExecStatus.isOK()) { return getExecStatus; } scoped_ptr<PlanExecutor> planExecutor(rawPlanExecutor); - planExecutor->setYieldPolicy(PlanExecutor::YIELD_AUTO); return Explain::explainStages(planExecutor.get(), verbosity, out); } diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 537543f5d75..a0814eb21dc 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -359,6 +359,7 @@ namespace mongo { // Create the inc collection and make sure we have index on "0" key. // Intentionally not replicating the inc collection to secondaries. Client::WriteContext incCtx(_txn, _config.incLong); + WriteUnitOfWork wuow(_txn); Collection* incColl = incCtx.getCollection(); invariant(!incColl); @@ -376,7 +377,7 @@ namespace mongo { uasserted( 17305 , str::stream() << "createIndex failed for mr incLong ns: " << _config.incLong << " err: " << status.code() ); } - incCtx.commit(); + wuow.commit(); } vector<BSONObj> indexesToInsert; @@ -384,6 +385,7 @@ namespace mongo { { // copy indexes into temporary storage Client::WriteContext finalCtx(_txn, _config.outputOptions.finalNamespace); + WriteUnitOfWork wuow(_txn); Collection* const finalColl = finalCtx.getCollection(); if ( finalColl ) { IndexCatalog::IndexIterator ii = @@ -406,12 +408,13 @@ namespace mongo { indexesToInsert.push_back( b.obj() ); } } - finalCtx.commit(); + wuow.commit(); } { // create temp collection and insert the indexes from temporary storage Client::WriteContext tempCtx(_txn, _config.tempNamespace); + WriteUnitOfWork wuow(_txn); uassert(ErrorCodes::NotMaster, "no longer master", repl::getGlobalReplicationCoordinator()-> canAcceptWritesForDatabase(nsToDatabase(_config.tempNamespace.c_str()))); @@ -443,7 +446,7 @@ namespace mongo { string logNs = nsToDatabase( _config.tempNamespace ) + ".system.indexes"; repl::logOp(_txn, "i", logNs.c_str(), *it); } - tempCtx.commit(); + wuow.commit(); } } @@ -664,6 +667,7 @@ namespace mongo { Client::WriteContext ctx(_txn, ns ); + WriteUnitOfWork wuow(_txn); uassert(ErrorCodes::NotMaster, "no longer master", repl::getGlobalReplicationCoordinator()-> canAcceptWritesForDatabase(nsToDatabase(ns.c_str()))); @@ -680,7 +684,7 @@ namespace mongo { coll->insertDocument( _txn, bo, true ); repl::logOp(_txn, "i", ns.c_str(), bo); - ctx.commit(); + wuow.commit(); } /** @@ -690,9 +694,10 @@ namespace mongo { verify( _onDisk ); Client::WriteContext ctx(_txn, _config.incLong ); + WriteUnitOfWork wuow(_txn); Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong); coll->insertDocument( _txn, o, true ); - ctx.commit(); + wuow.commit(); } State::State(OperationContext* txn, const Config& c) : @@ -969,6 +974,7 @@ namespace mongo { { Client::WriteContext incCtx(_txn, _config.incLong ); + WriteUnitOfWork wuow(_txn); Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong ); bool foundIndex = false; @@ -983,9 +989,9 @@ namespace mongo { break; } } - incCtx.commit(); verify( foundIndex ); + wuow.commit(); } scoped_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong)); @@ -1009,11 +1015,14 @@ namespace mongo { whereCallback).isOK()); PlanExecutor* rawExec; - verify(getExecutor(_txn, getCollectionOrUassert(ctx->getDb(), _config.incLong), - cq, &rawExec, QueryPlannerParams::NO_TABLE_SCAN).isOK()); + verify(getExecutor(_txn, + getCollectionOrUassert(ctx->getDb(), _config.incLong), + cq, + PlanExecutor::YIELD_AUTO, + &rawExec, + QueryPlannerParams::NO_TABLE_SCAN).isOK()); scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); // iterate over all sorted objects BSONObj o; @@ -1357,15 +1366,17 @@ namespace mongo { invariant(db); PlanExecutor* rawExec; - if (!getExecutor(txn, state.getCollectionOrUassert(db, config.ns), - cq, &rawExec).isOK()) { + if (!getExecutor(txn, + state.getCollectionOrUassert(db, config.ns), + cq, + PlanExecutor::YIELD_AUTO, + &rawExec).isOK()) { uasserted(17239, "Can't get executor for query " + config.filter.toString()); return 0; } scoped_ptr<PlanExecutor> exec(rawExec); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); Timer mt; diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index a8f65aa63f9..2f4c8fe34d3 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -102,13 +102,17 @@ namespace mongo { for ( size_t i = 0; i < numCursors; i++ ) { WorkingSet* ws = new WorkingSet(); MultiIteratorStage* mis = new MultiIteratorStage(txn, ws, collection); - // Takes ownership of 'ws' and 'mis'. - auto_ptr<PlanExecutor> curExec(new PlanExecutor(txn, ws, mis, collection)); - // Each of the plan executors should yield automatically. We pass "false" to - // indicate that 'curExec' should not register itself, as it will get registered - // by ClientCursor instead. - curExec->setYieldPolicy(PlanExecutor::YIELD_AUTO, false); + PlanExecutor* rawExec; + // Takes ownership of 'ws' and 'mis'. + Status execStatus = PlanExecutor::make(txn, ws, mis, collection, + PlanExecutor::YIELD_AUTO, &rawExec); + invariant(execStatus.isOK()); + auto_ptr<PlanExecutor> curExec(rawExec); + + // The PlanExecutor was registered on construction due to the YIELD_AUTO policy. + // We have to deregister it, as it will be registered with ClientCursor. + curExec->deregisterExec(); // Need to save state while yielding locks between now and newGetMore. curExec->saveState(); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 0ae4f12ebe0..ed6ee3323c2 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -245,16 +245,25 @@ namespace mongo { auto_ptr<WorkingSet> ws(new WorkingSet()); auto_ptr<PipelineProxyStage> proxy( new PipelineProxyStage(pPipeline, input, ws.get())); + Status execStatus = Status::OK(); if (NULL == collection) { - execHolder.reset(new PlanExecutor(txn, ws.release(), proxy.release(), ns)); + execStatus = PlanExecutor::make(txn, + ws.release(), + proxy.release(), + ns, + PlanExecutor::YIELD_MANUAL, + &exec); } else { - execHolder.reset(new PlanExecutor(txn, - ws.release(), - proxy.release(), - collection)); + execStatus = PlanExecutor::make(txn, + ws.release(), + proxy.release(), + collection, + PlanExecutor::YIELD_MANUAL, + &exec); } - exec = execHolder.get(); + invariant(execStatus.isOK()); + execHolder.reset(exec); if (!collection && input) { // If we don't have a collection, we won't be able to register any executors, so diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index 92544df2a6d..67459d7a9c8 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -79,13 +79,20 @@ namespace mongo { collection)); stage->addIterator(iter.release()); - std::auto_ptr<PlanExecutor> exec(new PlanExecutor(txn, - ws.release(), - stage.release(), - collection)); - - // 'exec' will be used in newGetMore(). Set its yield policy and save its state. - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + PlanExecutor* rawExec; + Status execStatus = PlanExecutor::make(txn, + ws.release(), + stage.release(), + collection, + PlanExecutor::YIELD_AUTO, + &rawExec); + invariant(execStatus.isOK()); + std::auto_ptr<PlanExecutor> exec(rawExec); + + // 'exec' will be used in newGetMore(). It was automatically registered on construction + // due to the auto yield policy, so it could yield during plan selection. We deregister + // it now so that it can be registed with ClientCursor. + exec->deregisterExec(); exec->saveState(); // ClientCursors' constructor inserts them into a global map that manages their diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp index 405d2c77948..faf5965ec04 100644 --- a/src/mongo/db/commands/test_commands.cpp +++ b/src/mongo/db/commands/test_commands.cpp @@ -165,8 +165,9 @@ namespace mongo { massert( 13418, "captrunc invalid n", PlanExecutor::ADVANCED == state); } } + WriteUnitOfWork wuow(txn); collection->temp_cappedTruncateAfter( txn, end, inc ); - ctx.commit(); + wuow.commit(); return true; } }; @@ -200,6 +201,7 @@ namespace mongo { NamespaceString nss( dbname, coll ); Client::WriteContext ctx(txn, nss.ns() ); + WriteUnitOfWork wuow(txn); Database* db = ctx.db(); Collection* collection = ctx.getCollection(); massert( 13429, "emptycapped no such collection", collection ); @@ -214,7 +216,7 @@ namespace mongo { if (!fromRepl) repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), cmdObj); - ctx.commit(); + wuow.commit(); return true; } }; diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 6bc0bef54f1..5b2026649ca 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -634,13 +634,13 @@ namespace mongo { // Step 1: Unlock all requests that are not-flush and not-global. for (size_t i = 0; i < stateOut->locks.size(); ++i) { for (size_t j = 0; j < stateOut->locks[i].recursiveCount; ++j) { - unlock(stateOut->locks[i].resourceId); + invariant(unlock(stateOut->locks[i].resourceId)); } } // Step 2: Unlock the global lock. for (size_t i = 0; i < stateOut->globalRecursiveCount; ++i) { - unlock(resourceIdGlobal); + invariant(unlock(resourceIdGlobal)); } // Step 3: Unlock flush. It's only acquired on the first global lock acquisition diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 499a6dfd965..d788f8b7970 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -645,7 +645,8 @@ namespace mongo { } PlanExecutor* rawExec; - if (!getExecutor(txn, coll, cq, &rawExec, QueryPlannerParams::NO_TABLE_SCAN).isOK()) { + if (!getExecutor(txn, coll, cq, PlanExecutor::YIELD_MANUAL, &rawExec, + QueryPlannerParams::NO_TABLE_SCAN).isOK()) { uasserted(17241, "Can't get executor for query " + query.toString()); return 0; } diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 334e87895d8..2adb42d3d87 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -130,7 +130,12 @@ namespace mongo { PlanExecutor* rawExec; size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT; massert(17245, "Could not get executor for query " + query.toString(), - getExecutor(txn, collection, cq, &rawExec, options).isOK()); + getExecutor(txn, + collection, + cq, + PlanExecutor::YIELD_MANUAL, + &rawExec, + options).isOK()); auto_ptr<PlanExecutor> exec(rawExec); PlanExecutor::ExecState state; @@ -408,6 +413,8 @@ namespace mongo { verify(PlanExecutor::ADVANCED == state); + WriteUnitOfWork wuow(txn); + if ( onlyRemoveOrphanedDocs ) { // Do a final check in the write lock to make absolutely sure that our // collection hasn't been modified in a way that invalidates our migration @@ -446,7 +453,7 @@ namespace mongo { collection->deleteDocument( txn, rloc, false, false, &deletedId ); // The above throws on failure, and so is not logged repl::logOp(txn, "d", ns.c_str(), deletedId, 0, 0, fromMigrate); - ctx.commit(); + wuow.commit(); numDeleted++; } diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index ddfeeaaffd0..f4633af95a8 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -164,6 +164,12 @@ namespace mongo { _txn = opCtx; ++_commonStats.unyields; _child->restoreState(opCtx); + + const NamespaceString& ns(_collection->ns()); + massert(28537, + str::stream() << "Demoted from primary while removing from " << ns.ns(), + !_params.shouldCallLogOp || + repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(ns.db())); } void DeleteStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp index 3db8e3e4dd7..686555d15fd 100644 --- a/src/mongo/db/exec/multi_plan.cpp +++ b/src/mongo/db/exec/multi_plan.cpp @@ -148,7 +148,7 @@ namespace mongo { return state; } - void MultiPlanStage::pickBestPlan() { + Status MultiPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of // execution work that happens here, so this is needed for the time accounting to // make sense. @@ -182,11 +182,28 @@ namespace mongo { // Work the plans, stopping when a plan hits EOF or returns some // fixed number of results. for (size_t ix = 0; ix < numWorks; ++ix) { + // Yield, if it's time to yield. + if (NULL != yieldPolicy && yieldPolicy->shouldYield()) { + bool alive = yieldPolicy->yield(); + if (!alive) { + _failure = true; + Status failStat(ErrorCodes::OperationFailed, + "PlanExecutor killed during plan selection"); + _statusMemberId = WorkingSetCommon::allocateStatusMember(_candidates[0].ws, + failStat); + return failStat; + } + } + bool moreToDo = workAllPlans(numResults); if (!moreToDo) { break; } } - if (_failure) { return; } + if (_failure) { + invariant(WorkingSet::INVALID_ID != _statusMemberId); + WorkingSetMember* member = _candidates[0].ws->get(_statusMemberId); + return WorkingSetCommon::getMemberStatus(*member); + } // After picking best plan, ranking will own plan stats from // candidate solutions (winner and losers). @@ -290,6 +307,8 @@ namespace mongo { _collection->infoCache()->getPlanCache()->add(*_query, solutions, ranking.release()); } } + + return Status::OK(); } vector<PlanStageStats*> MultiPlanStage::generateCandidateStats() { @@ -341,8 +360,6 @@ namespace mongo { // Propagate most recent seen failure to parent. if (PlanStage::FAILURE == state) { - BSONObj objOut; - WorkingSetCommon::getStatusMemberObject(*candidate.ws, id, &objOut); _statusMemberId = id; } diff --git a/src/mongo/db/exec/multi_plan.h b/src/mongo/db/exec/multi_plan.h index 964d71443d0..956969eb2d2 100644 --- a/src/mongo/db/exec/multi_plan.h +++ b/src/mongo/db/exec/multi_plan.h @@ -36,6 +36,7 @@ #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/query_solution.h" #include "mongo/db/query/plan_ranker.h" +#include "mongo/db/query/plan_yield_policy.h" namespace mongo { @@ -81,9 +82,15 @@ namespace mongo { /** * Runs all plans added by addPlan, ranks them, and picks a best. - * All further calls to getNext(...) will return results from the best plan. + * All further calls to work(...) will return results from the best plan. + * + * If 'yieldPolicy' is non-NULL, then all locks may be yielded in between round-robin + * works of the candidate plans. By default, 'yieldPolicy' is NULL and no yielding will + * take place. + * + * Returns a non-OK status if the plan was killed during yield. */ - void pickBestPlan(); + Status pickBestPlan(PlanYieldPolicy* yieldPolicy); /** Return true if a best plan has been chosen */ bool bestPlanChosen() const; @@ -156,20 +163,24 @@ namespace mongo { // uses -1 / kNoSuchPlan when best plan is not (yet) known int _backupPlanIdx; - // Did all plans fail while we were running them? Note that one plan can fail + // Set if this MultiPlanStage cannot continue, and the query must fail. This can happen in + // two ways. The first is that all candidate plans fail. Note that one plan can fail // during normal execution of the plan competition. Here is an example: // // Plan 1: collection scan with sort. Sort runs out of memory. // Plan 2: ixscan that provides sort. Won't run out of memory. // // We want to choose plan 2 even if plan 1 fails. + // + // The second way for failure to occur is that the execution of this query is killed during + // a yield, by some concurrent event such as a collection drop. bool _failure; // If everything fails during the plan competition, we can't pick one. size_t _failureCount; // if pickBestPlan fails, this is set to the wsid of the statusMember - // returned by ::work() + // returned by ::work() WorkingSetID _statusMemberId; // Stats diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index 75bcfb3dce9..e39024a5205 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -147,11 +147,15 @@ namespace mongo { // TODO: Do we want to do this for the user? I think so. PlanStage* rootFetch = new FetchStage(txn, ws.get(), userRoot, NULL, collection); - PlanExecutor runner(txn, ws.release(), rootFetch, collection); + PlanExecutor* rawExec; + Status execStatus = PlanExecutor::make(txn, ws.release(), rootFetch, collection, + PlanExecutor::YIELD_MANUAL, &rawExec); + fassert(28536, execStatus); + boost::scoped_ptr<PlanExecutor> exec(rawExec); BSONArrayBuilder resultBuilder(result.subarrayStart("results")); - for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) { + for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) { resultBuilder.append(obj); } diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp index 25af192a464..3159734c065 100644 --- a/src/mongo/db/exec/subplan.cpp +++ b/src/mongo/db/exec/subplan.cpp @@ -77,31 +77,6 @@ namespace mongo { } // static - Status SubplanStage::make(OperationContext* txn, - Collection* collection, - WorkingSet* ws, - const QueryPlannerParams& params, - CanonicalQuery* cq, - SubplanStage** out) { - auto_ptr<SubplanStage> autoStage(new SubplanStage(txn, collection, ws, params, cq)); - // Plan each branch of the $or. - Status planningStatus = autoStage->planSubqueries(); - if (!planningStatus.isOK()) { - return planningStatus; - } - - // Use the multi plan stage to select a winning plan for each branch, and then - // construct the overall winning plan from the resulting index tags. - Status multiPlanStatus = autoStage->pickBestPlan(); - if (!multiPlanStatus.isOK()) { - return multiPlanStatus; - } - - *out = autoStage.release(); - return Status::OK(); - } - - // static bool SubplanStage::canUseSubplanning(const CanonicalQuery& query) { const LiteParsedQuery& lpq = query.getParsed(); const MatchExpression* expr = query.root(); @@ -213,11 +188,7 @@ namespace mongo { return Status::OK(); } - Status SubplanStage::pickBestPlan() { - // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of - // work that happens here, so this is needed for the time accounting to make sense. - ScopedTimer timer(&_commonStats.executionTimeMillis); - + Status SubplanStage::choosePlanForSubqueries(PlanYieldPolicy* yieldPolicy) { // This is what we annotate with the index selections and then turn into a solution. auto_ptr<OrMatchExpression> theOr( static_cast<OrMatchExpression*>(_query->root()->shallowClone())); @@ -277,9 +248,8 @@ namespace mongo { _ws->clear(); - auto_ptr<MultiPlanStage> multiPlanStage(new MultiPlanStage(_txn, - _collection, - orChildCQ.get())); + _child.reset(new MultiPlanStage(_txn, _collection, orChildCQ.get())); + MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_child.get()); // Dump all the solutions into the MPR. for (size_t ix = 0; ix < solutions.size(); ++ix) { @@ -294,7 +264,11 @@ namespace mongo { multiPlanStage->addPlan(solutions[ix], nextPlanRoot, _ws); } - multiPlanStage->pickBestPlan(); + Status planSelectStat = multiPlanStage->pickBestPlan(yieldPolicy); + if (!planSelectStat.isOK()) { + return planSelectStat; + } + if (!multiPlanStage->bestPlanChosen()) { mongoutils::str::stream ss; ss << "Failed to pick best plan for subchild " @@ -303,7 +277,6 @@ namespace mongo { } QuerySolution* bestSoln = multiPlanStage->bestSolution(); - _child.reset(multiPlanStage.release()); // Check that we have good cache data. For example, we don't cache things // for 2d indices. @@ -372,12 +345,13 @@ namespace mongo { // with stats obtained in the same fashion as a competitive ranking would have obtained // them. _ws->clear(); - auto_ptr<MultiPlanStage> multiPlanStage(new MultiPlanStage(_txn, _collection, _query)); + _child.reset(new MultiPlanStage(_txn, _collection, _query)); + MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_child.get()); PlanStage* root; verify(StageBuilder::build(_txn, _collection, *soln, _ws, &root)); multiPlanStage->addPlan(soln, root, _ws); // Takes ownership first two arguments. - multiPlanStage->pickBestPlan(); + multiPlanStage->pickBestPlan(yieldPolicy); if (! multiPlanStage->bestPlanChosen()) { mongoutils::str::stream ss; ss << "Failed to pick best plan for subchild " @@ -385,7 +359,87 @@ namespace mongo { return Status(ErrorCodes::BadValue, ss); } - _child.reset(multiPlanStage.release()); + return Status::OK(); + } + + Status SubplanStage::choosePlanWholeQuery(PlanYieldPolicy* yieldPolicy) { + // Clear out the working set. We'll start with a fresh working set. + _ws->clear(); + + // Use the query planning module to plan the whole query. + vector<QuerySolution*> solutions; + Status status = QueryPlanner::plan(*_query, _plannerParams, &solutions); + if (!status.isOK()) { + return Status(ErrorCodes::BadValue, + "error processing query: " + _query->toString() + + " planner returned error: " + status.reason()); + } + + // We cannot figure out how to answer the query. Perhaps it requires an index + // we do not have? + if (0 == solutions.size()) { + return Status(ErrorCodes::BadValue, + str::stream() + << "error processing query: " + << _query->toString() + << " No query solutions"); + } + + if (1 == solutions.size()) { + PlanStage* root; + // Only one possible plan. Run it. Build the stages from the solution. + verify(StageBuilder::build(_txn, _collection, *solutions[0], _ws, &root)); + _child.reset(root); + return Status::OK(); + } + else { + // Many solutions. Create a MultiPlanStage to pick the best, update the cache, + // and so on. The working set will be shared by all candidate plans. + _child.reset(new MultiPlanStage(_txn, _collection, _query)); + MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_child.get()); + + for (size_t ix = 0; ix < solutions.size(); ++ix) { + if (solutions[ix]->cacheData.get()) { + solutions[ix]->cacheData->indexFilterApplied = + _plannerParams.indexFiltersApplied; + } + + // version of StageBuild::build when WorkingSet is shared + PlanStage* nextPlanRoot; + verify(StageBuilder::build(_txn, _collection, *solutions[ix], _ws, + &nextPlanRoot)); + + // Owns none of the arguments + multiPlanStage->addPlan(solutions[ix], nextPlanRoot, _ws); + } + + // Delegate the the MultiPlanStage's plan selection facility. + Status planSelectStat = multiPlanStage->pickBestPlan(yieldPolicy); + if (!planSelectStat.isOK()) { + return planSelectStat; + } + + return Status::OK(); + } + } + + Status SubplanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) { + // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of + // work that happens here, so this is needed for the time accounting to make sense. + ScopedTimer timer(&_commonStats.executionTimeMillis); + + // Plan each branch of the $or. + Status subplanningStatus = planSubqueries(); + if (!subplanningStatus.isOK()) { + return choosePlanWholeQuery(yieldPolicy); + } + + // Use the multi plan stage to select a winning plan for each branch, and then construct + // the overall winning plan from the resulting index tags. + Status subplanSelectStat = choosePlanForSubqueries(yieldPolicy); + if (!subplanSelectStat.isOK()) { + return choosePlanWholeQuery(yieldPolicy); + } return Status::OK(); } diff --git a/src/mongo/db/exec/subplan.h b/src/mongo/db/exec/subplan.h index 477d6853194..a446eb746df 100644 --- a/src/mongo/db/exec/subplan.h +++ b/src/mongo/db/exec/subplan.h @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/db/diskloc.h" #include "mongo/db/exec/plan_stage.h" +#include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/query/query_planner_params.h" #include "mongo/db/query/query_solution.h" @@ -51,18 +52,11 @@ namespace mongo { */ class SubplanStage : public PlanStage { public: - /** - * Used to create SubplanStage instances. The caller owns the instance - * returned through 'out'. - * - * 'out' is valid only if an OK status is returned. - */ - static Status make(OperationContext* txn, - Collection* collection, - WorkingSet* ws, - const QueryPlannerParams& params, - CanonicalQuery* cq, - SubplanStage** out); + SubplanStage(OperationContext* txn, + Collection* collection, + WorkingSet* ws, + const QueryPlannerParams& params, + CanonicalQuery* cq); virtual ~SubplanStage(); @@ -87,13 +81,22 @@ namespace mongo { static const char* kStageType; - private: - SubplanStage(OperationContext* txn, - Collection* collection, - WorkingSet* ws, - const QueryPlannerParams& params, - CanonicalQuery* cq); + /** + * Selects a plan using subplanning. First uses the query planning results from + * planSubqueries() and the multi plan stage to select the best plan for each branch. + * + * If this effort fails, then falls back on planning the whole query normally rather + * then planning $or branches independently. + * + * If 'yieldPolicy' is non-NULL, then all locks may be yielded in between round-robin + * works of the candidate plans. By default, 'yieldPolicy' is NULL and no yielding will + * take place. + * + * Returns a non-OK status if the plan was killed during yield or if planning fails. + */ + Status pickBestPlan(PlanYieldPolicy* yieldPolicy); + private: /** * Plan each branch of the $or independently, and store the resulting * lists of query solutions in '_solutions'. @@ -107,8 +110,15 @@ namespace mongo { /** * Uses the query planning results from planSubqueries() and the multi plan stage * to select the best plan for each branch. + * + * Helper for pickBestPlan(). + */ + Status choosePlanForSubqueries(PlanYieldPolicy* yieldPolicy); + + /** + * Used as a fallback if subplanning fails. Helper for pickBestPlan(). */ - Status pickBestPlan(); + Status choosePlanWholeQuery(PlanYieldPolicy* yieldPolicy); // transactional context for read locks. Not owned by us OperationContext* _txn; diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 7e16ebd9b7a..c3a44531f7b 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -789,9 +789,40 @@ namespace mongo { _child->saveState(); } + Status UpdateStage::restoreUpdateState(OperationContext* opCtx) { + const UpdateRequest& request = *_params.request; + const NamespaceString& nsString(request.getNamespaceString()); + + // We may have stepped down during the yield. + if (request.shouldCallLogOp() && + !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsString.db())) { + return Status(ErrorCodes::NotMaster, + str::stream() << "Demoted from primary while performing update on " + << nsString.ns()); + } + + if (request.getLifecycle()) { + UpdateLifecycle* lifecycle = request.getLifecycle(); + lifecycle->setCollection(_collection); + + if (!lifecycle->canContinue()) { + return Status(ErrorCodes::IllegalOperation, + "Update aborted due to invalid state transitions after yield.", + 17270); + } + + _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(opCtx)); + } + + return Status::OK(); + } + void UpdateStage::restoreState(OperationContext* opCtx) { ++_commonStats.unyields; + // Restore our child. _child->restoreState(opCtx); + // Restore self. + uassertStatusOK(restoreUpdateState(opCtx)); } void UpdateStage::invalidate(const DiskLoc& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h index 862074e4001..b543ebb5fdf 100644 --- a/src/mongo/db/exec/update.h +++ b/src/mongo/db/exec/update.h @@ -125,6 +125,11 @@ namespace mongo { */ bool needInsert(); + /** + * Helper for restoring the state of this update. + */ + Status restoreUpdateState(OperationContext* opCtx); + UpdateStageParams _params; // Not owned by us. diff --git a/src/mongo/db/fts/fts_command_mongod.cpp b/src/mongo/db/fts/fts_command_mongod.cpp index 08396d50d9c..e44e9327234 100644 --- a/src/mongo/db/fts/fts_command_mongod.cpp +++ b/src/mongo/db/fts/fts_command_mongod.cpp @@ -113,7 +113,11 @@ namespace mongo { } PlanExecutor* rawExec; - Status getExecStatus = getExecutor(txn, ctx.getCollection(), cq, &rawExec); + Status getExecStatus = getExecutor(txn, + ctx.getCollection(), + cq, + PlanExecutor::YIELD_MANUAL, + &rawExec); if (!getExecStatus.isOK()) { errmsg = getExecStatus.reason(); return false; diff --git a/src/mongo/db/ops/delete_executor.cpp b/src/mongo/db/ops/delete_executor.cpp index 8e39be7f7a1..aca3a86bb97 100644 --- a/src/mongo/db/ops/delete_executor.cpp +++ b/src/mongo/db/ops/delete_executor.cpp @@ -126,6 +126,16 @@ namespace mongo { str::stream() << "Not primary while removing from " << ns.ns()); } + // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set + // a manual yield policy. + const bool canYield = !_request->isGod() && ( + _canonicalQuery.get() ? + !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) : + !LiteParsedQuery::isQueryIsolated(_request->getQuery())); + + PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO : + PlanExecutor::YIELD_MANUAL; + PlanExecutor* rawExec; Status getExecStatus = Status::OK(); if (_canonicalQuery.get()) { @@ -137,6 +147,7 @@ namespace mongo { _request->shouldCallLogOp(), _request->isFromMigrate(), _request->isExplain(), + policy, &rawExec); } else { @@ -149,6 +160,7 @@ namespace mongo { _request->shouldCallLogOp(), _request->isFromMigrate(), _request->isExplain(), + policy, &rawExec); } @@ -159,18 +171,6 @@ namespace mongo { invariant(rawExec); _exec.reset(rawExec); - // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set - // a manual yield policy. - const bool canYield = !_request->isGod() && ( - _canonicalQuery.get() ? - !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) : - !LiteParsedQuery::isQueryIsolated(_request->getQuery())); - - PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO : - PlanExecutor::YIELD_MANUAL; - - _exec->setYieldPolicy(policy); - return Status::OK(); } diff --git a/src/mongo/db/ops/update_executor.cpp b/src/mongo/db/ops/update_executor.cpp index 3673382b9bd..c5de729a45f 100644 --- a/src/mongo/db/ops/update_executor.cpp +++ b/src/mongo/db/ops/update_executor.cpp @@ -160,18 +160,28 @@ namespace mongo { _driver.refreshIndexKeys(lifecycle->getIndexKeys(_request->getOpCtx())); } + // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set + // a manual yield policy. + const bool canYield = !_request->isGod() && ( + _canonicalQuery.get() ? + !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) : + !LiteParsedQuery::isQueryIsolated(_request->getQuery())); + + PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO : + PlanExecutor::YIELD_MANUAL; + PlanExecutor* rawExec = NULL; Status getExecStatus = Status::OK(); if (_canonicalQuery.get()) { // This is the regular path for when we have a CanonicalQuery. getExecStatus = getExecutorUpdate(_request->getOpCtx(), db, _canonicalQuery.release(), - _request, &_driver, _opDebug, &rawExec); + _request, &_driver, _opDebug, policy, &rawExec); } else { // This is the idhack fast-path for getting a PlanExecutor without doing the work // to create a CanonicalQuery. getExecStatus = getExecutorUpdate(_request->getOpCtx(), db, nsString.ns(), _request, - &_driver, _opDebug, &rawExec); + &_driver, _opDebug, policy, &rawExec); } if (!getExecStatus.isOK()) { @@ -181,18 +191,6 @@ namespace mongo { invariant(rawExec); _exec.reset(rawExec); - // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set - // a manual yield policy. - const bool canYield = !_request->isGod() && ( - _canonicalQuery.get() ? - !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) : - !LiteParsedQuery::isQueryIsolated(_request->getQuery())); - - PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO : - PlanExecutor::YIELD_MANUAL; - - _exec->setYieldPolicy(policy); - return Status::OK(); } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 48f4e87bd87..dd0a57d8f8e 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -177,8 +177,14 @@ namespace { projectionForQuery, &cq, whereCallback); + PlanExecutor* rawExec; - if (status.isOK() && getExecutor(txn, collection, cq, &rawExec, runnerOptions).isOK()) { + if (status.isOK() && getExecutor(txn, + collection, + cq, + PlanExecutor::YIELD_AUTO, + &rawExec, + runnerOptions).isOK()) { // success: The PlanExecutor will handle sorting for us using an index. exec.reset(rawExec); sortInRunner = true; @@ -203,15 +209,19 @@ namespace { whereCallback)); PlanExecutor* rawExec; - uassertStatusOK(getExecutor(txn, collection, cq, &rawExec, runnerOptions)); + uassertStatusOK(getExecutor(txn, + collection, + cq, + PlanExecutor::YIELD_AUTO, + &rawExec, + runnerOptions)); exec.reset(rawExec); } // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We - // pass "false" here to indicate that the PlanExecutor should not register itself: instead - // the output PlanExecutor will get registered with a ClientCursor. - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, false); + // deregister the PlanExecutor so that it can be registered with ClientCursor. + exec->deregisterExec(); exec->saveState(); // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 26db3998308..8c474c0f948 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -301,19 +301,10 @@ namespace mongo { && SubplanStage::canUseSubplanning(*canonicalQuery)) { QLOG() << "Running query as sub-queries: " << canonicalQuery->toStringShort(); + LOG(2) << "Running query as sub-queries: " << canonicalQuery->toStringShort(); - SubplanStage* subPlan; - Status subplanStatus = SubplanStage::make(opCtx, collection, ws, plannerParams, - canonicalQuery, &subPlan); - if (subplanStatus.isOK()) { - LOG(2) << "Running query as sub-queries: " << canonicalQuery->toStringShort(); - *rootOut = subPlan; - return Status::OK(); - } - else { - QLOG() << "Subplanner: " << subplanStatus.reason(); - // Fall back on non-subplan execution. - } + *rootOut = new SubplanStage(opCtx, collection, ws, plannerParams, canonicalQuery); + return Status::OK(); } vector<QuerySolution*> solutions; @@ -388,9 +379,6 @@ namespace mongo { multiPlanStage->addPlan(solutions[ix], nextPlanRoot, ws); } - // Do the plan selection up front. - multiPlanStage->pickBestPlan(); - *rootOut = multiPlanStage; return Status::OK(); } @@ -401,6 +389,7 @@ namespace mongo { Status getExecutor(OperationContext* txn, Collection* collection, CanonicalQuery* rawCanonicalQuery, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out, size_t plannerOptions) { auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery); @@ -415,15 +404,15 @@ namespace mongo { invariant(root); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. - *out = new PlanExecutor(txn, ws.release(), root, querySolution, canonicalQuery.release(), - collection); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, querySolution, canonicalQuery.release(), + collection, yieldPolicy, out); } Status getExecutor(OperationContext* txn, Collection* collection, const std::string& ns, const BSONObj& unparsedQuery, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out, size_t plannerOptions) { if (!collection) { @@ -431,8 +420,7 @@ namespace mongo { << " Using EOF stage: " << unparsedQuery.toString(); EOFStage* eofStage = new EOFStage(); WorkingSet* ws = new WorkingSet(); - *out = new PlanExecutor(txn, ws, eofStage, ns); - return Status::OK(); + return PlanExecutor::make(txn, ws, eofStage, ns, yieldPolicy, out); } if (!CanonicalQuery::isSimpleIdQuery(unparsedQuery) || @@ -446,7 +434,7 @@ namespace mongo { return status; // Takes ownership of 'cq'. - return getExecutor(txn, collection, cq, out, plannerOptions); + return getExecutor(txn, collection, cq, yieldPolicy, out, plannerOptions); } LOG(2) << "Using idhack: " << unparsedQuery.toString(); @@ -460,8 +448,7 @@ namespace mongo { root); } - *out = new PlanExecutor(txn, ws, root, collection); - return Status::OK(); + return PlanExecutor::make(txn, ws, root, collection, yieldPolicy, out); } // @@ -475,6 +462,7 @@ namespace mongo { bool shouldCallLogOp, bool fromMigrate, bool isExplain, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out) { auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery); auto_ptr<WorkingSet> ws(new WorkingSet()); @@ -495,9 +483,8 @@ namespace mongo { root = new DeleteStage(txn, deleteStageParams, ws.get(), collection, root); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. - *out = new PlanExecutor(txn, ws.release(), root, querySolution, canonicalQuery.release(), - collection); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, querySolution, canonicalQuery.release(), + collection, yieldPolicy, out); } Status getExecutorDelete(OperationContext* txn, @@ -508,6 +495,7 @@ namespace mongo { bool shouldCallLogOp, bool fromMigrate, bool isExplain, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out) { auto_ptr<WorkingSet> ws(new WorkingSet()); DeleteStageParams deleteStageParams; @@ -523,8 +511,7 @@ namespace mongo { << " Using EOF stage: " << unparsedQuery.toString(); DeleteStage* deleteStage = new DeleteStage(txn, deleteStageParams, ws.get(), NULL, new EOFStage()); - *out = new PlanExecutor(txn, ws.release(), deleteStage, ns); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), deleteStage, ns, yieldPolicy, out); } if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) && @@ -535,8 +522,7 @@ namespace mongo { ws.get()); DeleteStage* root = new DeleteStage(txn, deleteStageParams, ws.get(), collection, idHackStage); - *out = new PlanExecutor(txn, ws.release(), root, collection); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, collection, yieldPolicy, out); } const WhereCallbackReal whereCallback(txn, collection->ns().db()); @@ -551,7 +537,7 @@ namespace mongo { // Takes ownership of 'cq'. return getExecutorDelete(txn, collection, cq, isMulti, shouldCallLogOp, - fromMigrate, isExplain, out); + fromMigrate, isExplain, yieldPolicy, out); } // @@ -564,6 +550,7 @@ namespace mongo { const UpdateRequest* request, UpdateDriver* driver, OpDebug* opDebug, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut) { auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery); auto_ptr<WorkingSet> ws(new WorkingSet()); @@ -584,13 +571,14 @@ namespace mongo { root = new UpdateStage(updateStageParams, ws.get(), db, root); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. Takes ownership of all args other than 'collection' and 'txn' - *execOut = new PlanExecutor(txn, - ws.release(), - root, - querySolution, - canonicalQuery.release(), - collection); - return Status::OK(); + return PlanExecutor::make(txn, + ws.release(), + root, + querySolution, + canonicalQuery.release(), + collection, + yieldPolicy, + execOut); } Status getExecutorUpdate(OperationContext* txn, @@ -599,6 +587,7 @@ namespace mongo { const UpdateRequest* request, UpdateDriver* driver, OpDebug* opDebug, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut) { auto_ptr<WorkingSet> ws(new WorkingSet()); Collection* collection = db->getCollection(request->getOpCtx(), @@ -614,8 +603,7 @@ namespace mongo { << " Using EOF stage: " << unparsedQuery.toString(); UpdateStage* updateStage = new UpdateStage(updateStageParams, ws.get(), db, new EOFStage()); - *execOut = new PlanExecutor(txn, ws.release(), updateStage, ns); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), updateStage, ns, yieldPolicy, execOut); } if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) && @@ -625,8 +613,7 @@ namespace mongo { PlanStage* idHackStage = new IDHackStage(txn, collection, unparsedQuery["_id"].wrap(), ws.get()); UpdateStage* root = new UpdateStage(updateStageParams, ws.get(), db, idHackStage); - *execOut = new PlanExecutor(txn, ws.release(), root, collection); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, collection, yieldPolicy, execOut); } const WhereCallbackReal whereCallback(txn, collection->ns().db()); @@ -640,7 +627,7 @@ namespace mongo { return status; // Takes ownership of 'cq'. - return getExecutorUpdate(txn, db, cq, request, driver, opDebug, execOut); + return getExecutorUpdate(txn, db, cq, request, driver, opDebug, yieldPolicy, execOut); } // @@ -650,6 +637,7 @@ namespace mongo { Status getExecutorGroup(OperationContext* txn, Collection* collection, const GroupRequest& request, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut) { if (!globalScriptEngine) { return Status(ErrorCodes::BadValue, "server-side JavaScript execution is disabled"); @@ -664,8 +652,7 @@ namespace mongo { // reporting machinery always assumes that the root stage for a group operation is a // GroupStage, so in this case we put a GroupStage on top of an EOFStage. root = new GroupStage(txn, request, ws.get(), new EOFStage()); - *execOut = new PlanExecutor(txn, ws.release(), root, request.ns); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, request.ns, yieldPolicy, execOut); } const NamespaceString nss(request.ns); @@ -692,13 +679,14 @@ namespace mongo { root = new GroupStage(txn, request, ws.get(), root); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. Takes ownership of all args other than 'collection'. - *execOut = new PlanExecutor(txn, - ws.release(), - root, - querySolution, - canonicalQuery.release(), - collection); - return Status::OK(); + return PlanExecutor::make(txn, + ws.release(), + root, + querySolution, + canonicalQuery.release(), + collection, + yieldPolicy, + execOut); } // @@ -880,6 +868,7 @@ namespace mongo { Status getExecutorCount(OperationContext* txn, Collection* collection, const CountRequest& request, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut) { auto_ptr<WorkingSet> ws(new WorkingSet()); PlanStage* root; @@ -890,8 +879,7 @@ namespace mongo { // reporting machinery always assumes that the root stage for a count operation is // a CountStage, so in this case we put a CountStage on top of an EOFStage. root = new CountStage(txn, collection, request, ws.get(), new EOFStage()); - *execOut = new PlanExecutor(txn, ws.release(), root, request.ns); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, request.ns, yieldPolicy, execOut); } if (request.query.isEmpty()) { @@ -899,8 +887,7 @@ namespace mongo { // for its number of records. This is implemented by the CountStage, and we don't need // to create a child for the count stage in this case. root = new CountStage(txn, collection, request, ws.get(), NULL); - *execOut = new PlanExecutor(txn, ws.release(), root, request.ns); - return Status::OK(); + return PlanExecutor::make(txn, ws.release(), root, request.ns, yieldPolicy, execOut); } const WhereCallbackReal whereCallback(txn, collection->ns().db()); @@ -937,14 +924,14 @@ namespace mongo { root = new CountStage(txn, collection, request, ws.get(), root); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be NULL. Takes ownership of all args other than 'collection' and 'txn' - *execOut = new PlanExecutor(txn, - ws.release(), - root, - querySolution, - cq.release(), - collection); - - return Status::OK(); + return PlanExecutor::make(txn, + ws.release(), + root, + querySolution, + cq.release(), + collection, + yieldPolicy, + execOut); } // @@ -1004,6 +991,7 @@ namespace mongo { Collection* collection, const BSONObj& query, const std::string& field, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out) { // This should'a been checked by the distinct command. invariant(collection); @@ -1050,7 +1038,7 @@ namespace mongo { } // Takes ownership of 'cq'. - return getExecutor(txn, collection, cq, out); + return getExecutor(txn, collection, cq, yieldPolicy, out); } // @@ -1102,15 +1090,15 @@ namespace mongo { << ", planSummary: " << Explain::getPlanSummary(root); // Takes ownership of its arguments (except for 'collection'). - *out = new PlanExecutor(txn, ws, root, soln, autoCq.release(), collection); - return Status::OK(); + return PlanExecutor::make(txn, ws, root, soln, autoCq.release(), collection, + yieldPolicy, out); } // See if we can answer the query in a fast-distinct compatible fashion. vector<QuerySolution*> solutions; status = QueryPlanner::plan(*cq, plannerParams, &solutions); if (!status.isOK()) { - return getExecutor(txn, collection, autoCq.release(), out); + return getExecutor(txn, collection, autoCq.release(), yieldPolicy, out); } // We look for a solution that has an ixscan we can turn into a distinctixscan @@ -1131,9 +1119,9 @@ namespace mongo { LOG(2) << "Using fast distinct: " << cq->toStringShort() << ", planSummary: " << Explain::getPlanSummary(root); - // Takes ownership of its arguments (except for 'collection'). - *out = new PlanExecutor(txn, ws, root, solutions[i], autoCq.release(), collection); - return Status::OK(); + // Takes ownership of 'ws', 'root', 'solutions[i]', and 'autoCq'. + return PlanExecutor::make(txn, ws, root, solutions[i], autoCq.release(), + collection, yieldPolicy, out); } } @@ -1153,7 +1141,7 @@ namespace mongo { autoCq.reset(cq); // Takes ownership of 'autoCq'. - return getExecutor(txn, collection, autoCq.release(), out); + return getExecutor(txn, collection, autoCq.release(), yieldPolicy, out); } } // namespace mongo diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 60d015f93d0..86e8156dfd6 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -71,6 +71,7 @@ namespace mongo { Status getExecutor(OperationContext* txn, Collection* collection, CanonicalQuery* rawCanonicalQuery, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out, size_t plannerOptions = 0); @@ -90,6 +91,7 @@ namespace mongo { Collection* collection, const std::string& ns, const BSONObj& unparsedQuery, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out, size_t plannerOptions = 0); @@ -113,6 +115,7 @@ namespace mongo { Collection* collection, const BSONObj& query, const std::string& field, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** out); /* @@ -125,6 +128,7 @@ namespace mongo { Status getExecutorCount(OperationContext* txn, Collection* collection, const CountRequest& request, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut); // @@ -149,6 +153,7 @@ namespace mongo { bool shouldCallLogOp, bool fromMigrate, bool isExplain, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut); /** @@ -168,6 +173,7 @@ namespace mongo { bool shouldCallLogOp, bool fromMigrate, bool isExplain, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut); // @@ -191,6 +197,7 @@ namespace mongo { const UpdateRequest* request, UpdateDriver* driver, OpDebug* opDebug, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut); /** @@ -210,6 +217,7 @@ namespace mongo { const UpdateRequest* request, UpdateDriver* driver, OpDebug* opDebug, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut); // @@ -230,6 +238,7 @@ namespace mongo { Status getExecutorGroup(OperationContext* txn, Collection* collection, const GroupRequest& request, + PlanExecutor::YieldPolicy yieldPolicy, PlanExecutor** execOut); } // namespace mongo diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index c16aa251f2d..398c2ccbbb5 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -73,7 +73,16 @@ namespace mongo { if (NULL == collection) { EOFStage* eof = new EOFStage(); - return new PlanExecutor(txn, ws, eof, ns.toString()); + PlanExecutor* exec; + // Takes ownership if 'ws' and 'eof'. + Status execStatus = PlanExecutor::make(txn, + ws, + eof, + ns.toString(), + PlanExecutor::YIELD_MANUAL, + &exec); + invariant(execStatus.isOK()); + return exec; } dassert( ns == collection->ns().ns() ); @@ -90,8 +99,16 @@ namespace mongo { } CollectionScan* cs = new CollectionScan(txn, params, ws, NULL); + PlanExecutor* exec; // Takes ownership of 'ws' and 'cs'. - return new PlanExecutor(txn, ws, cs, collection); + Status execStatus = PlanExecutor::make(txn, + ws, + cs, + collection, + PlanExecutor::YIELD_MANUAL, + &exec); + invariant(execStatus.isOK()); + return exec; } /** @@ -123,7 +140,16 @@ namespace mongo { root = new FetchStage(txn, ws, root, NULL, collection); } - return new PlanExecutor(txn, ws, root, collection); + PlanExecutor* exec; + // Takes ownership of 'ws' and 'root'. + Status execStatus = PlanExecutor::make(txn, + ws, + root, + collection, + PlanExecutor::YIELD_MANUAL, + &exec); + invariant(execStatus.isOK()); + return exec; } }; diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp index e091193dc37..89d724c4c30 100644 --- a/src/mongo/db/query/new_find.cpp +++ b/src/mongo/db/query/new_find.cpp @@ -455,8 +455,12 @@ namespace mongo { WorkingSet* oplogws = new WorkingSet(); OplogStart* stage = new OplogStart(txn, collection, tsExpr, oplogws); + PlanExecutor* rawExec; // Takes ownership of ws and stage. - scoped_ptr<PlanExecutor> exec(new PlanExecutor(txn, oplogws, stage, collection)); + Status execStatus = PlanExecutor::make(txn, oplogws, stage, collection, + PlanExecutor::YIELD_AUTO, &rawExec); + invariant(execStatus.isOK()); + scoped_ptr<PlanExecutor> exec(rawExec); // The stage returns a DiskLoc of where to start. DiskLoc startLoc; @@ -464,7 +468,8 @@ namespace mongo { // This is normal. The start of the oplog is the beginning of the collection. if (PlanExecutor::IS_EOF == state) { - return getExecutor(txn, collection, autoCq.release(), execOut); + return getExecutor(txn, collection, autoCq.release(), PlanExecutor::YIELD_AUTO, + execOut); } // This is not normal. An error was encountered. @@ -485,8 +490,8 @@ namespace mongo { WorkingSet* ws = new WorkingSet(); CollectionScan* cs = new CollectionScan(txn, params, ws, cq->root()); // Takes ownership of 'ws', 'cs', and 'cq'. - *execOut = new PlanExecutor(txn, ws, cs, autoCq.release(), collection); - return Status::OK(); + return PlanExecutor::make(txn, ws, cs, autoCq.release(), collection, + PlanExecutor::YIELD_AUTO, execOut); } std::string newRunQuery(OperationContext* txn, @@ -580,15 +585,7 @@ namespace mongo { // Otherwise we go through the selection of which executor is most suited to the // query + run-time context at hand. Status status = Status::OK(); - if (collection == NULL) { - LOG(2) << "Collection " << ns << " does not exist." - << " Using EOF stage: " << cq->toStringShort(); - EOFStage* eofStage = new EOFStage(); - WorkingSet* ws = new WorkingSet(); - // Takes ownership of 'cq'. - rawExec = new PlanExecutor(txn, ws, eofStage, cq, NULL); - } - else if (pq.getOptions().oplogReplay) { + if (NULL != collection && pq.getOptions().oplogReplay) { // Takes ownership of 'cq'. status = getOplogStartHack(txn, collection, cq, &rawExec); } @@ -598,7 +595,7 @@ namespace mongo { options |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } // Takes ownership of 'cq'. - status = getExecutor(txn, collection, cq, &rawExec, options); + status = getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, options); } if (!status.isOK()) { @@ -609,11 +606,6 @@ namespace mongo { verify(NULL != rawExec); auto_ptr<PlanExecutor> exec(rawExec); - // We want the PlanExecutor to yield automatically, but we handle registration of the - // executor ourselves. We want to temporarily register the executor while we are generating - // this batch of results, and then unregister and re-register with ClientCursor for getmore. - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); - // If it's actually an explain, do the explain and return rather than falling through // to the normal query execution loop. if (pq.isExplain()) { diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index 601091a567b..cca412cac8a 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -29,9 +29,11 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" +#include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/query/plan_yield_policy.h" @@ -40,46 +42,93 @@ namespace mongo { - PlanExecutor::PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - const Collection* collection) - : _opCtx(opCtx), - _collection(collection), - _cq(NULL), - _workingSet(ws), - _qs(NULL), - _root(rt), - _killed(false) { - initNs(); + namespace { + + /** + * Retrieves the first stage of a given type from the plan tree, or NULL + * if no such stage is found. + */ + PlanStage* getStageByType(PlanStage* root, StageType type) { + if (root->stageType() == type) { + return root; + } + + vector<PlanStage*> children = root->getChildren(); + for (size_t i = 0; i < children.size(); i++) { + PlanStage* result = getStageByType(children[i], type); + if (result) { + return result; + } + } + + return NULL; + } + } - PlanExecutor::PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - std::string ns) - : _opCtx(opCtx), - _collection(NULL), - _cq(NULL), - _workingSet(ws), - _qs(NULL), - _root(rt), - _ns(ns), - _killed(false) { } + // static + Status PlanExecutor::make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + const Collection* collection, + YieldPolicy yieldPolicy, + PlanExecutor** out) { + return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, collection, "", yieldPolicy, out); + } - PlanExecutor::PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - CanonicalQuery* cq, - const Collection* collection) - : _opCtx(opCtx), - _collection(collection), - _cq(cq), - _workingSet(ws), - _qs(NULL), - _root(rt), - _killed(false) { - initNs(); + // static + Status PlanExecutor::make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + const std::string& ns, + YieldPolicy yieldPolicy, + PlanExecutor** out) { + return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, NULL, ns, yieldPolicy, out); + } + + // static + Status PlanExecutor::make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + CanonicalQuery* cq, + const Collection* collection, + YieldPolicy yieldPolicy, + PlanExecutor** out) { + return PlanExecutor::make(opCtx, ws, rt, NULL, cq, collection, "", yieldPolicy, out); + } + + // static + Status PlanExecutor::make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + QuerySolution* qs, + CanonicalQuery* cq, + const Collection* collection, + YieldPolicy yieldPolicy, + PlanExecutor** out) { + return PlanExecutor::make(opCtx, ws, rt, qs, cq, collection, "", yieldPolicy, out); + } + + // static + Status PlanExecutor::make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + QuerySolution* qs, + CanonicalQuery* cq, + const Collection* collection, + const std::string& ns, + YieldPolicy yieldPolicy, + PlanExecutor** out) { + std::auto_ptr<PlanExecutor> exec(new PlanExecutor(opCtx, ws, rt, qs, cq, collection, ns)); + + // Perform plan selection, if necessary. + Status status = exec->pickBestPlan(yieldPolicy); + if (!status.isOK()) { + return status; + } + + *out = exec.release(); + return Status::OK(); } PlanExecutor::PlanExecutor(OperationContext* opCtx, @@ -87,18 +136,22 @@ namespace mongo { PlanStage* rt, QuerySolution* qs, CanonicalQuery* cq, - const Collection* collection) + const Collection* collection, + const std::string& ns) : _opCtx(opCtx), _collection(collection), _cq(cq), _workingSet(ws), _qs(qs), _root(rt), + _ns(ns), _killed(false) { - initNs(); - } + // We may still need to initialize _ns from either _collection or _cq. + if (!_ns.empty()) { + // We already have an _ns set, so there's nothing more to do. + return; + } - void PlanExecutor::initNs() { if (NULL != _collection) { _ns = _collection->ns().ns(); } @@ -108,6 +161,31 @@ namespace mongo { } } + Status PlanExecutor::pickBestPlan(YieldPolicy policy) { + // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and + // register it to receive notifications. + this->setYieldPolicy(policy); + + // First check if we need to do subplanning. + PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN); + if (foundStage) { + SubplanStage* subplan = static_cast<SubplanStage*>(foundStage); + return subplan->pickBestPlan(_yieldPolicy.get()); + } + + // If we didn't have to do subplanning, we might still have to do regular + // multi plan selection. + foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN); + if (foundStage) { + MultiPlanStage* mps = static_cast<MultiPlanStage*>(foundStage); + return mps->pickBestPlan(_yieldPolicy.get()); + } + + // Either we chose a plan, or no plan selection was required. In both cases, + // our work has been successfully completed. + return Status::OK(); + } + PlanExecutor::~PlanExecutor() { } // static @@ -180,6 +258,14 @@ namespace mongo { if (_killed) { return PlanExecutor::DEAD; } for (;;) { + // Yield if it's time to yield. + if (NULL != _yieldPolicy.get() && _yieldPolicy->shouldYield()) { + _yieldPolicy->yield(); + if (_killed) { + return PlanExecutor::DEAD; + } + } + WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState code = _root->work(&id); @@ -282,19 +368,22 @@ namespace mongo { } Status PlanExecutor::executePlan() { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState code = PlanStage::NEED_TIME; - while (PlanStage::NEED_TIME == code || PlanStage::ADVANCED == code) { - code = _root->work(&id); + BSONObj obj; + PlanExecutor::ExecState state = PlanExecutor::ADVANCED; + while (PlanExecutor::ADVANCED == state) { + state = this->getNext(&obj, NULL); } - if (PlanStage::FAILURE == code) { - BSONObj obj; - WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &obj); - return Status(ErrorCodes::BadValue, - "Exec error: " + WorkingSetCommon::toStatusString(obj)); + if (PlanExecutor::DEAD == state) { + return Status(ErrorCodes::OperationFailed, "Exec error: PlanExecutor killed"); + } + else if (PlanExecutor::EXEC_ERROR == state) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "Exec error: " + << WorkingSetCommon::toStatusString(obj)); } + invariant(PlanExecutor::IS_EOF == state); return Status::OK(); } @@ -308,7 +397,7 @@ namespace mongo { } else { invariant(PlanExecutor::YIELD_AUTO == policy); - _yieldPolicy.reset(new PlanYieldPolicy()); + _yieldPolicy.reset(new PlanYieldPolicy(this)); // Runners that yield automatically generally need to be registered so that // after yielding, they receive notifications of events like deletions and diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index db616fc7281..f7da5783174 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -89,27 +89,35 @@ namespace mongo { // // 1. Register your PlanExecutor with ClientCursor. Registered executors are informed // about DiskLoc deletions and namespace invalidation, as well as other important - // events. Do this either by calling registerExec() on the executor. This can be done - // once you get your executor, or could be done per-yield. + // events. Do this by calling registerExec() on the executor. Alternatively, this can + // be done per-yield (as described below). // - // 2. Call exec->saveState() before you yield. + // 2. Construct a PlanYieldPolicy 'policy', passing 'exec' to the constructor. // - // 3. Call Yield::yieldAllLocks(), passing in the executor's OperationContext*. This - // causes the executor to give up its locks and block so that it goes to the back of - // the scheduler's queue. + // 3. Call PlanYieldPolicy::yield() on 'policy'. If your PlanExecutor is not yet + // registered (because you want to register on a per-yield basis), then pass + // 'true' to yield(). // - // 4. Call exec->restoreState() before using the executor again. - // - // 5. The next call to exec->getNext() may return DEAD. - // - // 6. Make sure the executor gets deregistered from ClientCursor. PlanExecutor does - // this in an RAII fashion when it is destroyed, or you can explicity call - // unregisterExec() on the PlanExecutor. + // 4. The call to yield() returns a boolean indicating whether or not 'exec' is + // still alove. If it is false, then 'exec' was killed during the yield and is + // no longer valid. YIELD_MANUAL, }; // - // Constructors / destructor. + // Factory methods. + // + // On success, return a new PlanExecutor, owned by the caller, through 'out'. + // + // Passing YIELD_AUTO to any of these factories will construct a yielding runner which + // may yield in the following circumstances: + // 1) During plan selection inside the call to make(). + // 2) On any call to getNext(). + // 3) While executing the plan inside executePlan(). + // + // The runner will also be automatically registered to receive notifications in the + // case of YIELD_AUTO, so no further calls to registerExec() or setYieldPolicy() are + // necessary. // /** @@ -118,40 +126,48 @@ namespace mongo { * Right now this is only for idhack updates which neither canonicalize * nor go through normal planning. */ - PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - const Collection* collection); + static Status make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + const Collection* collection, + YieldPolicy yieldPolicy, + PlanExecutor** out); /** * Used when we have a NULL collection and no canonical query. In this case, * we need to explicitly pass a namespace to the plan executor. */ - PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - std::string ns); + static Status make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + const std::string& ns, + YieldPolicy yieldPolicy, + PlanExecutor** out); /** * Used when there is a canonical query but no query solution (e.g. idhack * queries, queries against a NULL collection, queries using the subplan stage). */ - PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - CanonicalQuery* cq, - const Collection* collection); + static Status make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + CanonicalQuery* cq, + const Collection* collection, + YieldPolicy yieldPolicy, + PlanExecutor** out); /** * The constructor for the normal case, when you have both a canonical query * and a query solution. */ - PlanExecutor(OperationContext* opCtx, - WorkingSet* ws, - PlanStage* rt, - QuerySolution* qs, - CanonicalQuery* cq, - const Collection* collection); + static Status make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + QuerySolution* qs, + CanonicalQuery* cq, + const Collection* collection, + YieldPolicy yieldPolicy, + PlanExecutor** out); ~PlanExecutor(); @@ -228,6 +244,8 @@ namespace mongo { * For read operations, objOut or dlOut are populated with another query result. * * For write operations, the return depends on the particulars of the write stage. + * + * If an AUTO_YIELD policy is set, then this method may yield. */ ExecState getNext(BSONObj* objOut, DiskLoc* dlOut); @@ -242,6 +260,8 @@ namespace mongo { /** * Execute the plan to completion, throwing out the results. Used when you want to work the * underlying tree without getting results back. + * + * If an AUTO_YIELD policy is set on this executor, then this will automatically yield. */ Status executePlan(); @@ -315,9 +335,42 @@ namespace mongo { }; /** - * Initialize the namespace using either the canonical query or the collection. + * New PlanExecutor instances are created with the static make() methods above. + */ + PlanExecutor(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + QuerySolution* qs, + CanonicalQuery* cq, + const Collection* collection, + const std::string& ns); + + /** + * Public factory methods delegate to this private factory to do their work. + */ + static Status make(OperationContext* opCtx, + WorkingSet* ws, + PlanStage* rt, + QuerySolution* qs, + CanonicalQuery* cq, + const Collection* collection, + const std::string& ns, + YieldPolicy yieldPolicy, + PlanExecutor** out); + + /** + * Clients of PlanExecutor expect that on receiving a new instance from one of the make() + * factory methods, plan selection has already been completed. In order to enforce this + * property, this function is called to do plan selection prior to returning the new + * PlanExecutor. + * + * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage, + * this calls into their underlying plan selection facilities. Otherwise, does nothing. + * + * If an AUTO_YIELD policy is set (and document-level locking is not supported), then + * locks are yielded during plan selection. */ - void initNs(); + Status pickBestPlan(YieldPolicy policy); // The OperationContext that we're executing within. We need this in order to release // locks. diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index f8db2c1e98a..689a76973b5 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -31,61 +31,52 @@ #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/concurrency/yield.h" +#include "mongo/db/global_environment_experiment.h" namespace mongo { // Yield every 128 cycles or 10ms. These values were inherited from v2.6, which just copied // them from v2.4. - PlanYieldPolicy::PlanYieldPolicy() + PlanYieldPolicy::PlanYieldPolicy(PlanExecutor* exec) : _elapsedTracker(128, 10), - _planYielding(NULL) { } - - PlanYieldPolicy::~PlanYieldPolicy() { - if (NULL != _planYielding) { - // We were destructed mid-yield. Since we're being used to yield a runner, we have - // to deregister the runner. - if (_planYielding->collection()) { - _planYielding->collection()->cursorCache()->deregisterExecutor(_planYielding); - } - } - } - - /** - * Yield the provided runner, registering and deregistering it appropriately. Deal with - * deletion during a yield by setting _runnerYielding to ensure deregistration. - * - * Provided runner MUST be YIELD_MANUAL. - */ - bool PlanYieldPolicy::yieldAndCheckIfOK(PlanExecutor* plan) { - invariant(plan); - invariant(plan->collection()); + _planYielding(exec) { } - // If micros > 0, we should yield. - plan->saveState(); + bool PlanYieldPolicy::shouldYield() { + return _elapsedTracker.intervalHasElapsed(); + } - // If we're destructed during yield this will be used to deregister ourselves. - // This happens when we're not in a ClientCursor and somebody kills all cursors - // on the ns we're operating on. - _planYielding = plan; + bool PlanYieldPolicy::yield(bool registerPlan) { + // This is a no-op if document-level locking is supported. Doc-level locking systems + // should not need to yield. + if (supportsDocLocking()) { + return true; + } - // Register with the thing that may kill() the 'plan'. - plan->collection()->cursorCache()->registerExecutor(plan); + // No need to yield if the collection is NULL. + if (NULL == _planYielding->collection()) { + return true; + } - // Note that this call checks for interrupt, and thus can throw if interrupt flag is set. - Yield::yieldAllLocks(plan->getOpCtx(), 1); + invariant(_planYielding); - // If the plan was killed, runner->collection() will return NULL, and we can't/don't - // deregister it. - if (plan->collection()) { - plan->collection()->cursorCache()->deregisterExecutor(plan); + if (registerPlan) { + _planYielding->registerExec(); } - _planYielding = NULL; + OperationContext* opCtx = _planYielding->getOpCtx(); + invariant(opCtx); + + _planYielding->saveState(); + // Note that this call checks for interrupt, and thus can throw if interrupt flag is set. + Yield::yieldAllLocks(opCtx, 1); _elapsedTracker.resetLastTime(); - return plan->restoreState(plan->getOpCtx()); + if (registerPlan) { + _planYielding->deregisterExec(); + } + + return _planYielding->restoreState(opCtx); } } // namespace mongo - diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index cb6d1a0a9ec..50aff9656f9 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -29,26 +29,43 @@ #pragma once #include "mongo/db/catalog/collection.h" +#include "mongo/db/query/plan_executor.h" #include "mongo/util/elapsed_tracker.h" namespace mongo { class PlanYieldPolicy { public: - PlanYieldPolicy(); - ~PlanYieldPolicy(); + explicit PlanYieldPolicy(PlanExecutor* exec); + + /** + * Used by AUTO_YIELD plan executors in order to check whether it is time to yield. + * PlanExecutors give up their locks periodically in order to be fair to other + * threads. + */ + bool shouldYield(); /** - * Yield the provided runner, registering and deregistering it appropriately. Deal with - * deletion during a yield by setting _planYielding to ensure deregistration. + * Used to cause a plan executor to give up locks and go to sleep. The PlanExecutor + * must *not* be in saved state. Handles calls to save/restore state internally. * - * Provided plan executor MUST be YIELD_MANUAL. + * By default, assumes that the PlanExecutor is already registered. If 'registerPlan' + * is explicitly set to true, then the executor will get automatically registered and + * deregistered here. + * + * Returns true if the executor was restored successfully and is still alive. Returns false + * if the executor got killed during yield. */ - bool yieldAndCheckIfOK(PlanExecutor* plan); + bool yield(bool registerPlan = false); private: + // Default constructor disallowed in order to ensure initialization of '_planYielding'. + PlanYieldPolicy(); + ElapsedTracker _elapsedTracker; + // The plan executor which this yield policy is responsible for yielding. Must + // not outlive the plan executor. PlanExecutor* _planYielding; }; diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index 861d9dee426..0dc26fde8b2 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -223,7 +223,6 @@ namespace mongo { n = deleteObjects( txn, ctx.ctx().db(), ns, query, false, true ); ttlDeletedDocuments.increment( n ); - ctx.commit(); } LOG(1) << "\tTTL deleted: " << n << endl; diff --git a/src/mongo/dbtests/clienttests.cpp b/src/mongo/dbtests/clienttests.cpp index 01584aed69d..0b26a83e208 100644 --- a/src/mongo/dbtests/clienttests.cpp +++ b/src/mongo/dbtests/clienttests.cpp @@ -156,7 +156,6 @@ namespace ClientTests { ASSERT_EQUALS(1U, db.getIndexSpecs(ns()).size()); db.ensureIndex(ns(), BSON("x" << 1), true); - ctx.commit(); ASSERT_EQUALS(2, indexCatalog->numIndexesReady(&txn)); ASSERT_EQUALS(2U, db.getIndexSpecs(ns()).size()); diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index cca1395c9dd..33b94af0ff0 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -177,7 +177,11 @@ namespace DocumentSourceTests { CanonicalQuery* cq; uassertStatusOK(CanonicalQuery::canonicalize(ns, /*query=*/BSONObj(), &cq)); PlanExecutor* execBare; - uassertStatusOK(getExecutor(&_opCtx, ctx.getCollection(), cq, &execBare)); + uassertStatusOK(getExecutor(&_opCtx, + ctx.getCollection(), + cq, + PlanExecutor::YIELD_MANUAL, + &execBare)); _exec.reset(execBare); _exec->saveState(); diff --git a/src/mongo/dbtests/executor_registry.cpp b/src/mongo/dbtests/executor_registry.cpp index 6ad15d20cbb..821e81ac377 100644 --- a/src/mongo/dbtests/executor_registry.cpp +++ b/src/mongo/dbtests/executor_registry.cpp @@ -60,12 +60,6 @@ namespace ExecutorRegistry { } } - ~ExecutorRegistryBase() { - if (_ctx.get()) { - _ctx->commit(); - } - } - /** * Return a plan executor that is going over the collection in ns(). */ @@ -80,17 +74,33 @@ namespace ExecutorRegistry { // Create a plan executor to hold it CanonicalQuery* cq; ASSERT(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); - // Owns all args - return new PlanExecutor(&_opCtx, ws.release(), scan.release(), cq, - _ctx->ctx().db()->getCollection( &_opCtx, ns() )); + PlanExecutor* exec; + // Takes ownership of 'ws', 'scan', and 'cq'. + Status status = PlanExecutor::make(&_opCtx, + ws.release(), + scan.release(), + cq, + _ctx->ctx().db()->getCollection(&_opCtx, ns()), + PlanExecutor::YIELD_MANUAL, + &exec); + ASSERT_OK(status); + return exec; } void registerExecutor( PlanExecutor* exec ) { - _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->registerExecutor( exec ); + WriteUnitOfWork wuow(&_opCtx); + _ctx->ctx().db()->getOrCreateCollection(&_opCtx, ns()) + ->cursorCache() + ->registerExecutor(exec); + wuow.commit(); } void deregisterExecutor( PlanExecutor* exec ) { - _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->deregisterExecutor( exec ); + WriteUnitOfWork wuow(&_opCtx); + _ctx->ctx().db()->getOrCreateCollection(&_opCtx, ns()) + ->cursorCache() + ->deregisterExecutor(exec); + wuow.commit(); } int N() { return 50; } @@ -278,7 +288,6 @@ namespace ExecutorRegistry { // Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB // requires a "global write lock." - _ctx->commit(); _ctx.reset(); _client.dropDatabase("somesillydb"); _ctx.reset(new Client::WriteContext(&_opCtx, ns())); @@ -295,7 +304,6 @@ namespace ExecutorRegistry { registerExecutor(run.get()); // Drop our DB. Once again, must give up the lock. - _ctx->commit(); _ctx.reset(); _client.dropDatabase("unittests"); _ctx.reset(new Client::WriteContext(&_opCtx, ns())); @@ -303,7 +311,6 @@ namespace ExecutorRegistry { // Unregister and restore state. deregisterExecutor(run.get()); run->restoreState(&_opCtx); - _ctx->commit(); _ctx.reset(); // PlanExecutor was killed. diff --git a/src/mongo/dbtests/indexcatalogtests.cpp b/src/mongo/dbtests/indexcatalogtests.cpp index a18488c542a..5d27c3d9200 100644 --- a/src/mongo/dbtests/indexcatalogtests.cpp +++ b/src/mongo/dbtests/indexcatalogtests.cpp @@ -34,24 +34,27 @@ namespace IndexCatalogTests { IndexIteratorTests() { OperationContextImpl txn; Client::WriteContext ctx(&txn, _ns); + WriteUnitOfWork wuow(&txn); _db = ctx.db(); _coll = _db->createCollection(&txn, _ns); _catalog = _coll->getIndexCatalog(); - ctx.commit(); + wuow.commit(); } ~IndexIteratorTests() { OperationContextImpl txn; Client::WriteContext ctx(&txn, _ns); + WriteUnitOfWork wuow(&txn); _db->dropCollection(&txn, _ns); - ctx.commit(); + wuow.commit(); } void run() { OperationContextImpl txn; Client::WriteContext ctx(&txn, _ns); + WriteUnitOfWork wuow(&txn); int numFinishedIndexesStart = _catalog->numIndexesReady(&txn); @@ -77,7 +80,7 @@ namespace IndexCatalogTests { } } - ctx.commit(); + wuow.commit(); ASSERT_TRUE(indexesIterated == _catalog->numIndexesReady(&txn)); ASSERT_TRUE(foundIndex); } diff --git a/src/mongo/dbtests/indexupdatetests.cpp b/src/mongo/dbtests/indexupdatetests.cpp index 28d1d707289..fe8200fee38 100644 --- a/src/mongo/dbtests/indexupdatetests.cpp +++ b/src/mongo/dbtests/indexupdatetests.cpp @@ -55,13 +55,14 @@ namespace IndexUpdateTests { public: IndexBuildBase() : _ctx(&_txn, _ns), + _wunit(&_txn), _client(&_txn) { _client.createCollection( _ns ); } ~IndexBuildBase() { _client.dropCollection( _ns ); - _ctx.commit(); // just for testing purposes + _wunit.commit(); // just for testing purposes getGlobalEnvironment()->unsetKillAllOperations(); } Collection* collection() { @@ -117,6 +118,7 @@ namespace IndexUpdateTests { OperationContextImpl _txn; Client::WriteContext _ctx; + WriteUnitOfWork _wunit; DBDirectClient _client; }; diff --git a/src/mongo/dbtests/plan_ranking.cpp b/src/mongo/dbtests/plan_ranking.cpp index 6018e7f2616..41ec196a6fe 100644 --- a/src/mongo/dbtests/plan_ranking.cpp +++ b/src/mongo/dbtests/plan_ranking.cpp @@ -72,7 +72,6 @@ namespace PlanRankingTests { Client::WriteContext ctx(&_txn, ns); _client.dropCollection(ns); - ctx.commit(); } virtual ~PlanRankingTestBase() { @@ -84,13 +83,11 @@ namespace PlanRankingTests { void insert(const BSONObj& obj) { Client::WriteContext ctx(&_txn, ns); _client.insert(ns, obj); - ctx.commit(); } void addIndex(const BSONObj& obj) { Client::WriteContext ctx(&_txn, ns); _client.ensureIndex(ns, obj); - ctx.commit(); } /** @@ -125,8 +122,9 @@ namespace PlanRankingTests { // Takes ownership of all (actually some) arguments. _mps->addPlan(solutions[i], root, ws.get()); } - - _mps->pickBestPlan(); // This is what sets a backup plan, should we test for it. + // This is what sets a backup plan, should we test for it. NULL means that there + // is no yield policy for this MultiPlanStage's plan selection. + _mps->pickBestPlan(NULL); ASSERT(_mps->bestPlanChosen()); size_t bestPlanIdx = _mps->bestPlanIdx(); diff --git a/src/mongo/dbtests/query_multi_plan_runner.cpp b/src/mongo/dbtests/query_multi_plan_runner.cpp index 8e622df8507..7da7cb7c28c 100644 --- a/src/mongo/dbtests/query_multi_plan_runner.cpp +++ b/src/mongo/dbtests/query_multi_plan_runner.cpp @@ -72,31 +72,26 @@ namespace QueryMultiPlanRunner { MultiPlanRunnerBase() : _client(&_txn) { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } virtual ~MultiPlanRunnerBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void addIndex(const BSONObj& obj) { Client::WriteContext ctx(&_txn, ns()); _client.ensureIndex(ns(), obj); - ctx.commit(); } void insert(const BSONObj& obj) { Client::WriteContext ctx(&_txn, ns()); _client.insert(ns(), obj); - ctx.commit(); } void remove(const BSONObj& obj) { Client::WriteContext ctx(&_txn, ns()); _client.remove(ns(), obj); - ctx.commit(); } static const char* ns() { return "unittests.QueryStageMultiPlanRunner"; } @@ -160,18 +155,23 @@ namespace QueryMultiPlanRunner { mps->addPlan(createQuerySolution(), firstRoot.release(), sharedWs.get()); mps->addPlan(createQuerySolution(), secondRoot.release(), sharedWs.get()); - // Plan 0 aka the first plan aka the index scan should be the best. - mps->pickBestPlan(); + // Plan 0 aka the first plan aka the index scan should be the best. NULL means that + // 'mps' will not yield during plan selection. + mps->pickBestPlan(NULL); ASSERT(mps->bestPlanChosen()); ASSERT_EQUALS(0, mps->bestPlanIdx()); // Takes ownership of arguments other than 'collection'. - PlanExecutor exec(&_txn, sharedWs.release(), mps, cq, coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, sharedWs.release(), mps, cq, coll, + PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); // Get all our results out. int results = 0; BSONObj obj; - while (PlanExecutor::ADVANCED == exec.getNext(&obj, NULL)) { + while (PlanExecutor::ADVANCED == exec->getNext(&obj, NULL)) { ASSERT_EQUALS(obj["foo"].numberInt(), 7); ++results; } @@ -235,8 +235,8 @@ namespace QueryMultiPlanRunner { mps->addPlan(solutions[i], root, ws.get()); } - // This sets a backup plan. - mps->pickBestPlan(); + // This sets a backup plan. NULL means that 'mps' will not yield. + mps->pickBestPlan(NULL); ASSERT(mps->bestPlanChosen()); ASSERT(mps->hasBackupPlan()); diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 693c8e3a72c..1fa7212831d 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -97,8 +97,11 @@ namespace QueryPlanExecutor { // Make the stage. auto_ptr<PlanStage> root(new CollectionScan(&_txn, csparams, ws.get(), cq->root())); + PlanExecutor* exec; // Hand the plan off to the executor. - PlanExecutor* exec = new PlanExecutor(&_txn, ws.release(), root.release(), cq, coll); + Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll, + PlanExecutor::YIELD_MANUAL, &exec); + ASSERT_OK(stat); return exec; } @@ -136,8 +139,12 @@ namespace QueryPlanExecutor { verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); verify(NULL != cq); + PlanExecutor* exec; // Hand the plan off to the executor. - return new PlanExecutor(&_txn, ws.release(), root.release(), cq, coll); + Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll, + PlanExecutor::YIELD_MANUAL, &exec); + ASSERT_OK(stat); + return exec; } static const char* ns() { return "unittests.QueryPlanExecutor"; } @@ -153,15 +160,19 @@ namespace QueryPlanExecutor { void registerExec( PlanExecutor* exec ) { // TODO: This is not correct (create collection under S-lock) AutoGetCollectionForRead ctx(&_txn, ns()); + WriteUnitOfWork wunit(&_txn); Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns()); collection->cursorCache()->registerExecutor( exec ); + wunit.commit(); } void deregisterExec( PlanExecutor* exec ) { // TODO: This is not correct (create collection under S-lock) AutoGetCollectionForRead ctx(&_txn, ns()); + WriteUnitOfWork wunit(&_txn); Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns()); collection->cursorCache()->deregisterExecutor( exec ); + wunit.commit(); } protected: @@ -203,7 +214,6 @@ namespace QueryPlanExecutor { ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); deregisterExec(exec.get()); - ctx.commit(); } }; @@ -233,7 +243,6 @@ namespace QueryPlanExecutor { ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); deregisterExec(exec.get()); - ctx.commit(); } }; @@ -270,8 +279,12 @@ namespace QueryPlanExecutor { std::auto_ptr<PipelineProxyStage> proxy( new PipelineProxyStage(pipeline, innerExec, ws.get())); Collection* collection = ctx.getCollection(); - boost::scoped_ptr<PlanExecutor> outerExec( - new PlanExecutor(&_txn, ws.release(), proxy.release(), collection)); + + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws.release(), proxy.release(), collection, + PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> outerExec(rawExec); // Only the outer executor gets registered. registerExec(outerExec.get()); @@ -284,7 +297,6 @@ namespace QueryPlanExecutor { ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL)); deregisterExec(outerExec.get()); - ctx.commit(); } }; @@ -352,7 +364,6 @@ namespace QueryPlanExecutor { int ids[] = {3, 4, 2}; checkIds(ids, exec.get()); - ctx.commit(); } }; @@ -382,7 +393,6 @@ namespace QueryPlanExecutor { // we should not see the moved document again. int ids[] = {3, 4}; checkIds(ids, exec.get()); - ctx.commit(); } }; @@ -412,7 +422,6 @@ namespace QueryPlanExecutor { ASSERT_EQUALS(1U, numCursors()); coll->cursorCache()->invalidateAll(false); ASSERT_EQUALS(0U, numCursors()); - ctx.commit(); } }; @@ -449,7 +458,6 @@ namespace QueryPlanExecutor { // number of cursors to return to 0. ccPin.deleteUnderlying(); ASSERT_EQUALS(0U, numCursors()); - ctx.commit(); } }; @@ -463,7 +471,6 @@ namespace QueryPlanExecutor { { Client::WriteContext ctx(&_txn, ns()); insert(BSON("a" << 1 << "b" << 1)); - ctx.commit(); } { diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp index c2e3a3630fb..645aba10cd2 100644 --- a/src/mongo/dbtests/query_stage_and.cpp +++ b/src/mongo/dbtests/query_stage_and.cpp @@ -160,7 +160,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -249,7 +251,6 @@ namespace QueryStageAnd { ASSERT_GREATER_THAN_OR_EQUALS(elt.numberInt(), 10); } - ctx.commit(); ASSERT_EQUALS(10, count); } }; @@ -262,7 +263,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -335,7 +338,6 @@ namespace QueryStageAnd { ++count; } - ctx.commit(); ASSERT_EQUALS(count, 20); } }; @@ -348,7 +350,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -378,7 +382,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // foo == bar == baz, and foo<=20, bar>=10, so our values are: // foo == 10, 11, 12, 13, 14, 15. 16, 17, 18, 19, 20 @@ -397,7 +400,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Generate large keys for {foo: 1, big: 1} index. @@ -432,7 +437,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // Stage execution should fail. ASSERT_EQUALS(-1, countResults(ah.get())); @@ -449,7 +453,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Generate large keys for {baz: 1, big: 1} index. @@ -484,7 +490,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // foo == bar == baz, and foo<=20, bar>=10, so our values are: // foo == 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20. @@ -500,7 +505,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -539,7 +546,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // foo == bar == baz, and foo<=20, bar>=10, 5<=baz<=15, so our values are: // foo == 10, 11, 12, 13, 14, 15. @@ -561,7 +567,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Generate large keys for {bar: 1, big: 1} index. @@ -605,7 +613,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // Stage execution should fail. ASSERT_EQUALS(-1, countResults(ah.get())); @@ -620,7 +627,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -660,7 +669,6 @@ namespace QueryStageAnd { if (PlanStage::ADVANCED != status) { continue; } ++count; } - ctx.commit(); ASSERT_EQUALS(0, count); @@ -679,7 +687,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 10; ++i) { @@ -713,7 +723,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = false; params.direction = -1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); ASSERT_EQUALS(0, countResults(ah.get())); } @@ -727,7 +736,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -761,7 +772,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // Bar == 97 ASSERT_EQUALS(1, countResults(ah.get())); @@ -779,7 +789,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -814,7 +826,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // Check that the AndHash stage returns docs {foo: 10, bar: 10} // through {foo: 20, bar: 20}. @@ -837,7 +848,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -872,7 +885,6 @@ namespace QueryStageAnd { // constructor means there is no filter. FetchStage* fetch = new FetchStage(&_txn, &ws, secondScan, NULL, coll); ah->addChild(fetch); - ctx.commit(); // Check that the AndHash stage returns docs {foo: 10, bar: 10} // through {foo: 20, bar: 20}. @@ -900,7 +912,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Insert a bunch of data @@ -1000,7 +1014,6 @@ namespace QueryStageAnd { ASSERT_TRUE(member->getFieldDotted("bar", &elt)); ASSERT_EQUALS(1, elt.numberInt()); } - ctx.commit(); ASSERT_EQUALS(count, 48); @@ -1017,7 +1030,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Insert a bunch of data @@ -1056,7 +1071,6 @@ namespace QueryStageAnd { // baz == 1 params.descriptor = getIndex(BSON("baz" << 1), coll); ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); ASSERT_EQUALS(50, countResults(ah.get())); } @@ -1070,10 +1084,11 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } - for (int i = 0; i < 50; ++i) { insert(BSON("foo" << 8 << "bar" << 20)); } @@ -1101,7 +1116,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); ASSERT_EQUALS(0, countResults(ah.get())); } @@ -1115,7 +1129,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -1149,7 +1165,6 @@ namespace QueryStageAnd { params.bounds.endKeyInclusive = true; params.direction = 1; ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); ASSERT_EQUALS(0, countResults(ah.get())); } @@ -1163,7 +1178,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -1193,7 +1210,6 @@ namespace QueryStageAnd { // bar == 1 params.descriptor = getIndex(BSON("bar" << 1), coll); ah->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); // Filter drops everything. ASSERT_EQUALS(0, countResults(ah.get())); @@ -1208,7 +1224,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } for (int i = 0; i < 50; ++i) { @@ -1253,7 +1271,6 @@ namespace QueryStageAnd { } lastId = id; } - ctx.commit(); ASSERT_EQUALS(count, 43); } @@ -1270,7 +1287,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Insert a bunch of data @@ -1302,7 +1321,6 @@ namespace QueryStageAnd { // bar == 1 params.descriptor = getIndex(BSON("bar" << 1), coll); as->addChild(new IndexScan(&_txn, params, &ws, NULL)); - ctx.commit(); for (int i = 0; i < 50; i++) { BSONObj obj = getNext(as.get(), &ws); @@ -1323,7 +1341,9 @@ namespace QueryStageAnd { Database* db = ctx.db(); Collection* coll = ctx.getCollection(); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } // Insert a bunch of data @@ -1355,7 +1375,6 @@ namespace QueryStageAnd { // constructor means there is no filter. FetchStage* fetch = new FetchStage(&_txn, &ws, secondScan, NULL, coll); as->addChild(fetch); - ctx.commit(); for (int i = 0; i < 50; i++) { BSONObj obj = getNext(as.get(), &ws); diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index 9cbf2dc22f7..db759de784f 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -60,13 +60,11 @@ namespace QueryStageCollectionScan { bob.append("foo", i); _client.insert(ns(), bob.obj()); } - ctx.commit(); } virtual ~QueryStageCollectionScanBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void remove(const BSONObj& obj) { @@ -90,11 +88,16 @@ namespace QueryStageCollectionScan { // Make a scan and have the runner own it. WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(&_txn, params, ws, filterExpr.get()); - PlanExecutor runner(&_txn, ws, ps, params.collection); + + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, ps, params.collection, + PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); // Use the runner to count the number of objects scanned. int count = 0; - for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) { ++count; } + for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) { ++count; } return count; } @@ -195,10 +198,15 @@ namespace QueryStageCollectionScan { // Make a scan and have the runner own it. WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(&_txn, params, ws, NULL); - PlanExecutor runner(&_txn, ws, ps, params.collection); + + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, ps, params.collection, + PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); int count = 0; - for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) { + for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) { // Make sure we get the objects in the order we want ASSERT_EQUALS(count, obj["foo"].numberInt()); ++count; @@ -224,10 +232,15 @@ namespace QueryStageCollectionScan { WorkingSet* ws = new WorkingSet(); PlanStage* ps = new CollectionScan(&_txn, params, ws, NULL); - PlanExecutor runner(&_txn, ws, ps, params.collection); + + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, ps, params.collection, + PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); int count = 0; - for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) { + for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) { ++count; ASSERT_EQUALS(numObj() - count, obj["foo"].numberInt()); } @@ -293,7 +306,6 @@ namespace QueryStageCollectionScan { ++count; } } - ctx.commit(); ASSERT_EQUALS(numObj(), count); } @@ -355,7 +367,6 @@ namespace QueryStageCollectionScan { ++count; } } - ctx.commit(); ASSERT_EQUALS(numObj(), count); } diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp index 8e18509342e..f0d64edaa1c 100644 --- a/src/mongo/dbtests/query_stage_count.cpp +++ b/src/mongo/dbtests/query_stage_count.cpp @@ -55,7 +55,6 @@ namespace QueryStageCount { virtual ~CountBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void addIndex(const BSONObj& obj) { @@ -117,7 +116,6 @@ namespace QueryStageCount { // Add an index on a:1 addIndex(BSON("a" << 1)); - ctx.commit(); // Set up the count stage CountScanParams params; @@ -151,7 +149,6 @@ namespace QueryStageCount { // Add an index addIndex(BSON("a" << 1)); - ctx.commit(); // Set up the count stage CountScanParams params; @@ -184,7 +181,6 @@ namespace QueryStageCount { // Add an index addIndex(BSON("a" << 1)); - ctx.commit(); // Set up the count stage CountScanParams params; @@ -213,7 +209,6 @@ namespace QueryStageCount { // Insert doc, add index insert(BSON("a" << 2)); addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count, and run CountScanParams params; @@ -243,7 +238,6 @@ namespace QueryStageCount { insert(BSON("a" << 2)); insert(BSON("a" << 3)); addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count, and run CountScanParams params; @@ -274,7 +268,6 @@ namespace QueryStageCount { insert(BSON("a" << 2)); insert(BSON("a" << 4)); addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count, and run CountScanParams params; @@ -306,7 +299,6 @@ namespace QueryStageCount { insert(BSON("a" << i)); } addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count stage CountScanParams params; @@ -358,7 +350,6 @@ namespace QueryStageCount { insert(BSON("a" << i)); } addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count stage CountScanParams params; @@ -386,7 +377,6 @@ namespace QueryStageCount { // Remove remaining objects remove(BSON("a" << GTE << 5)); - ctx.commit(); // Recover from yield count.restoreState(&_txn); @@ -414,7 +404,6 @@ namespace QueryStageCount { insert(BSON("a" << i)); } addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count stage CountScanParams params; @@ -445,7 +434,6 @@ namespace QueryStageCount { // Insert one document after the end insert(BSON("a" << 6.5)); - ctx.commit(); // Recover from yield count.restoreState(&_txn); @@ -473,7 +461,6 @@ namespace QueryStageCount { insert(BSON("a" << i)); } addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count stage CountScanParams params; @@ -501,7 +488,6 @@ namespace QueryStageCount { // Insert a document with two values for 'a' insert(BSON("a" << BSON_ARRAY(10 << 11))); - ctx.commit(); // Recover from yield count.restoreState(&_txn); @@ -533,7 +519,6 @@ namespace QueryStageCount { remove(BSON("a" << 1 << "b" << 0)); remove(BSON("a" << 1 << "b" << 3)); remove(BSON("a" << 1 << "b" << 4)); - ctx.commit(); // Ensure that count does not include unused keys CountScanParams params; @@ -567,7 +552,6 @@ namespace QueryStageCount { // Mark key at end position as 'unused' by deleting remove(BSON("a" << 1 << "b" << 9)); - ctx.commit(); // Run count and check CountScanParams params; @@ -598,7 +582,6 @@ namespace QueryStageCount { insert(BSON("a" << 1 << "b" << i)); } addIndex(BSON("a" << 1)); - ctx.commit(); // Set up count stage CountScanParams params; @@ -626,7 +609,6 @@ namespace QueryStageCount { // Mark the key at position 5 as 'unused' remove(BSON("a" << 1 << "b" << 5)); - ctx.commit(); // Recover from yield count.restoreState(&_txn); diff --git a/src/mongo/dbtests/query_stage_delete.cpp b/src/mongo/dbtests/query_stage_delete.cpp index 19ce4fe37cd..2c905e184cb 100644 --- a/src/mongo/dbtests/query_stage_delete.cpp +++ b/src/mongo/dbtests/query_stage_delete.cpp @@ -54,13 +54,11 @@ namespace QueryStageDelete { bob.append("foo", static_cast<long long int>(i)); _client.insert(ns(), bob.obj()); } - ctx.commit(); } virtual ~QueryStageDeleteBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void remove(const BSONObj& obj) { @@ -158,8 +156,6 @@ namespace QueryStageDelete { } ASSERT_EQUALS(numObj() - 1, stats->docsDeleted); - - ctx.commit(); } }; diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp index cd06b3e6fdb..713103d74f1 100644 --- a/src/mongo/dbtests/query_stage_fetch.cpp +++ b/src/mongo/dbtests/query_stage_fetch.cpp @@ -88,12 +88,14 @@ namespace QueryStageFetch { public: void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } + WorkingSet ws; // Add an object to the DB. @@ -101,7 +103,6 @@ namespace QueryStageFetch { set<DiskLoc> locs; getLocs(&locs, coll); ASSERT_EQUALS(size_t(1), locs.size()); - ctx.commit(); // Create a mock stage that returns the WSM. auto_ptr<MockStage> mockStage(new MockStage(&ws)); @@ -147,12 +148,14 @@ namespace QueryStageFetch { public: void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } + WorkingSet ws; // Add an object to the DB. @@ -197,7 +200,6 @@ namespace QueryStageFetch { // No more data to fetch, so, EOF. state = fetchStage->work(&id); ASSERT_EQUALS(PlanStage::IS_EOF, state); - ctx.commit(); } }; diff --git a/src/mongo/dbtests/query_stage_keep.cpp b/src/mongo/dbtests/query_stage_keep.cpp index 2bb6373d865..91c8fb639ba 100644 --- a/src/mongo/dbtests/query_stage_keep.cpp +++ b/src/mongo/dbtests/query_stage_keep.cpp @@ -105,12 +105,14 @@ namespace QueryStageKeep { public: void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } + WorkingSet ws; // Add 10 objects to the collection. @@ -126,7 +128,6 @@ namespace QueryStageKeep { member->obj = BSON("x" << 2); ws.flagForReview(id); } - ctx.commit(); // Create a collscan to provide the 10 objects in the collection. CollectionScanParams params; diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index ab4c027aaf7..a318da31114 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -54,7 +54,6 @@ namespace QueryStageMergeSortTests { virtual ~QueryStageMergeSortTestBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void addIndex(const BSONObj& obj) { @@ -114,7 +113,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } const int N = 50; @@ -149,15 +150,18 @@ namespace QueryStageMergeSortTests { // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); - ctx.commit(); // Must fetch if we want to easily pull out an obj. - PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL)); - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(i, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) @@ -166,7 +170,7 @@ namespace QueryStageMergeSortTests { // Should be done now. BSONObj foo; - ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&foo, NULL)); + ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&foo, NULL)); } }; @@ -178,7 +182,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } const int N = 50; @@ -213,14 +219,17 @@ namespace QueryStageMergeSortTests { // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); - ctx.commit(); - PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL)); - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(i, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) @@ -229,7 +238,7 @@ namespace QueryStageMergeSortTests { // Should be done now. BSONObj foo; - ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL)); + ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; @@ -241,7 +250,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } const int N = 50; @@ -276,15 +287,18 @@ namespace QueryStageMergeSortTests { // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); - ctx.commit(); - PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; // We inserted N objects but we get 2 * N from the runner because of dups. - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL)); - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(i, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) @@ -293,7 +307,7 @@ namespace QueryStageMergeSortTests { // Should be done now. BSONObj foo; - ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL)); + ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; @@ -305,7 +319,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } const int N = 50; @@ -342,14 +358,17 @@ namespace QueryStageMergeSortTests { // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); - ctx.commit(); - PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL)); - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(N - i - 1, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) @@ -358,7 +377,7 @@ namespace QueryStageMergeSortTests { // Should be done now. BSONObj foo; - ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL)); + ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; @@ -370,7 +389,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } const int N = 50; @@ -407,21 +428,24 @@ namespace QueryStageMergeSortTests { params.bounds.startKey = BSON("" << 51 << "" << MinKey); params.bounds.endKey = BSON("" << 51 << "" << MaxKey); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); - ctx.commit(); - PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); // Only getting results from the a:1 index scan. for (int i = 0; i < N; ++i) { BSONObj obj; - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&obj, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL)); ASSERT_EQUALS(i, obj["c"].numberInt()); ASSERT_EQUALS(1, obj["a"].numberInt()); } // Should be done now. BSONObj foo; - ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL)); + ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; @@ -433,7 +457,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } WorkingSet* ws = new WorkingSet(); @@ -460,13 +486,16 @@ namespace QueryStageMergeSortTests { params.descriptor = getIndex(indexSpec, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); } - ctx.commit(); - PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll); + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); for (int i = 0; i < numIndices; ++i) { BSONObj obj; - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&obj, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL)); ASSERT_EQUALS(i, obj["foo"].numberInt()); string index(1, 'a' + i); ASSERT_EQUALS(1, obj[index].numberInt()); @@ -474,7 +503,7 @@ namespace QueryStageMergeSortTests { // Should be done now. BSONObj foo; - ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL)); + ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; @@ -486,7 +515,9 @@ namespace QueryStageMergeSortTests { Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } WorkingSet ws; @@ -520,7 +551,6 @@ namespace QueryStageMergeSortTests { getLocs(&locs, coll); set<DiskLoc>::iterator it = locs.begin(); - ctx.commit(); // Get 10 results. Should be getting results in order of 'locs'. int count = 0; diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index 20834444537..0092f6ccec6 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -122,22 +122,26 @@ namespace QueryStageSortTests { params.limit = limit(); // Must fetch so we can look at the doc as a BSONObj. - PlanExecutor runner(&_txn, - ws, - new FetchStage(&_txn, ws, - new SortStage(&_txn, params, ws, ms), NULL, coll), - coll); + PlanExecutor* rawExec; + Status status = + PlanExecutor::make(&_txn, + ws, + new FetchStage(&_txn, ws, + new SortStage(&_txn, params, ws, ms), NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); // Look at pairs of objects to make sure that the sort order is pairwise (and therefore // totally) correct. BSONObj last; - ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&last, NULL)); + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&last, NULL)); // Count 'last'. int count = 1; BSONObj current; - while (PlanExecutor::ADVANCED == runner.getNext(¤t, NULL)) { + while (PlanExecutor::ADVANCED == exec->getNext(¤t, NULL)) { int cmp = sgn(current.woSortOrder(last, params.pattern)); // The next object should be equal to the previous or oriented according to the sort // pattern. @@ -185,16 +189,16 @@ namespace QueryStageSortTests { void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } fillData(); sortAndCheck(1, coll); - ctx.commit(); } }; @@ -205,16 +209,16 @@ namespace QueryStageSortTests { void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } fillData(); sortAndCheck(-1, coll); - ctx.commit(); } }; @@ -234,16 +238,16 @@ namespace QueryStageSortTests { void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } fillData(); sortAndCheck(-1, coll); - ctx.commit(); } }; @@ -254,12 +258,14 @@ namespace QueryStageSortTests { void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } + fillData(); // The data we're going to later invalidate. @@ -319,7 +325,6 @@ namespace QueryStageSortTests { ASSERT(!member->hasLoc()); ++count; } - ctx.commit(); // Returns all docs. ASSERT_EQUALS(limit() ? limit() : numObj(), count); @@ -345,11 +350,12 @@ namespace QueryStageSortTests { void run() { Client::WriteContext ctx(&_txn, ns()); - Database* db = ctx.ctx().db(); Collection* coll = db->getCollection(&_txn, ns()); if (!coll) { + WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); + wuow.commit(); } WorkingSet* ws = new WorkingSet(); @@ -372,15 +378,18 @@ namespace QueryStageSortTests { params.limit = 0; // We don't get results back since we're sorting some parallel arrays. - PlanExecutor runner(&_txn, - ws, - new FetchStage(&_txn, - ws, - new SortStage(&_txn, params, ws, ms), NULL, coll), - coll); - PlanExecutor::ExecState runnerState = runner.getNext(NULL, NULL); + PlanExecutor* rawExec; + Status status = + PlanExecutor::make(&_txn, + ws, + new FetchStage(&_txn, + ws, + new SortStage(&_txn, params, ws, ms), NULL, coll), + coll, PlanExecutor::YIELD_MANUAL, &rawExec); + boost::scoped_ptr<PlanExecutor> exec(rawExec); + + PlanExecutor::ExecState runnerState = exec->getNext(NULL, NULL); ASSERT_EQUALS(PlanExecutor::EXEC_ERROR, runnerState); - ctx.commit(); } }; diff --git a/src/mongo/dbtests/query_stage_subplan.cpp b/src/mongo/dbtests/query_stage_subplan.cpp index d5f1c5e5801..e431fcb68db 100644 --- a/src/mongo/dbtests/query_stage_subplan.cpp +++ b/src/mongo/dbtests/query_stage_subplan.cpp @@ -47,7 +47,6 @@ namespace QueryStageSubplan { virtual ~QueryStageSubplanBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void addIndex(const BSONObj& obj) { @@ -93,12 +92,13 @@ namespace QueryStageSubplan { QueryPlannerParams plannerParams; fillOutPlannerParams(&_txn, collection, cq, &plannerParams); - // We expect creation of the subplan stage to fail. WorkingSet ws; - SubplanStage* subplan; - ASSERT_NOT_OK(SubplanStage::make(&_txn, collection, &ws, plannerParams, cq, &subplan)); + boost::scoped_ptr<SubplanStage> subplan(new SubplanStage(&_txn, collection, &ws, + plannerParams, cq)); - ctx.commit(); + // NULL means that 'subplan' will not yield during plan selection. Plan selection + // should succeed due to falling back on regular planning. + ASSERT_OK(subplan->pickBestPlan(NULL)); } }; diff --git a/src/mongo/dbtests/query_stage_tests.cpp b/src/mongo/dbtests/query_stage_tests.cpp index ea59b791c5a..c0b203ef0b9 100644 --- a/src/mongo/dbtests/query_stage_tests.cpp +++ b/src/mongo/dbtests/query_stage_tests.cpp @@ -59,19 +59,16 @@ namespace QueryStageTests { addIndex(BSON("foo" << 1)); addIndex(BSON("foo" << 1 << "baz" << 1)); - ctx.commit(); } virtual ~IndexScanBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void addIndex(const BSONObj& obj) { Client::WriteContext ctx(&_txn, ns()); _client.ensureIndex(ns(), obj); - ctx.commit(); } int countResults(const IndexScanParams& params, BSONObj filterObj = BSONObj()) { @@ -82,13 +79,19 @@ namespace QueryStageTests { auto_ptr<MatchExpression> filterExpr(swme.getValue()); WorkingSet* ws = new WorkingSet(); - PlanExecutor runner(&_txn, - ws, - new IndexScan(&_txn, params, ws, filterExpr.get()), - ctx.getCollection()); + + PlanExecutor* rawExec; + Status status = PlanExecutor::make(&_txn, + ws, + new IndexScan(&_txn, params, ws, filterExpr.get()), + ctx.getCollection(), + PlanExecutor::YIELD_MANUAL, + &rawExec); + ASSERT_OK(status); + boost::scoped_ptr<PlanExecutor> exec(rawExec); int count = 0; - for (DiskLoc dl; PlanExecutor::ADVANCED == runner.getNext(NULL, &dl); ) { + for (DiskLoc dl; PlanExecutor::ADVANCED == exec->getNext(NULL, &dl); ) { ++count; } @@ -103,7 +106,6 @@ namespace QueryStageTests { double lng = double(rand()) / RAND_MAX; _client.insert(ns(), BSON("geo" << BSON_ARRAY(lng << lat))); } - ctx.commit(); } IndexDescriptor* getIndex(const BSONObj& obj) { diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp index 2ca29438a93..7be23ea50d5 100644 --- a/src/mongo/dbtests/query_stage_update.cpp +++ b/src/mongo/dbtests/query_stage_update.cpp @@ -57,13 +57,11 @@ namespace QueryStageUpdate { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); _client.createCollection(ns()); - ctx.commit(); } virtual ~QueryStageUpdateBase() { Client::WriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); - ctx.commit(); } void insert(const BSONObj& doc) { @@ -218,7 +216,6 @@ namespace QueryStageUpdate { new UpdateStage(params, ws.get(), db, eofStage.release())); runUpdate(updateStage.get()); - ctx.commit(); } // Verify the contents of the resulting collection. @@ -323,8 +320,6 @@ namespace QueryStageUpdate { ASSERT(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state); } - ctx.commit(); - // 4 of the 5 matching documents should have been modified (one was deleted). ASSERT_EQUALS(4U, stats->nModified); ASSERT_EQUALS(4U, stats->nMatched); diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 4467de0a3d8..57b9632e94e 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -261,7 +261,6 @@ namespace QueryTests { // ASSERT( clientCursor.c()->pq ); // ASSERT_EQUALS( 2, clientCursor.c()->pq->getNumToReturn() ); ASSERT_EQUALS( 2, clientCursor.c()->pos() ); - ctx.commit(); } cursor = _client.getMore( ns, cursorId ); @@ -1206,8 +1205,13 @@ namespace QueryTests { // note that extents are always at least 4KB now - so this will get rounded up // a bit. - ASSERT( userCreateNS(&_txn, ctx.db(), ns(), - fromjson( "{ capped : true, size : 2000 }" ), false ).isOK() ); + { + WriteUnitOfWork wunit(&_txn); + ASSERT( userCreateNS(&_txn, ctx.db(), ns(), + fromjson( "{ capped : true, size : 2000 }" ), false ).isOK() ); + wunit.commit(); + } + for (int i = 0; i < 200; i++) { insertNext(); ASSERT(count() < 90); @@ -1230,7 +1234,6 @@ namespace QueryTests { for ( int i=0; i<90; i++ ) { insertNext(); } - ctx.commit(); while ( c->more() ) { c->next(); } } @@ -1257,7 +1260,6 @@ namespace QueryTests { for ( int i=0; i<50; i++ ) { insert( ns() , BSON( "_id" << i << "x" << i * 2 ) ); } - ctx.commit(); ASSERT_EQUALS( 50 , count() ); @@ -1312,7 +1314,6 @@ namespace QueryTests { for ( int i=0; i<1000; i+=2 ) { _client.remove( ns() , BSON( "_id" << i ) ); } - ctx.commit(); BSONObj res; for ( int i=0; i<1000; i++ ) { @@ -1333,7 +1334,6 @@ namespace QueryTests { for ( int i=0; i<1000; i++ ) { insert( ns() , BSON( "_id" << i << "x" << i * 2 ) ); } - ctx.commit(); } }; @@ -1532,7 +1532,6 @@ namespace QueryTests { str::stream() << "Cannot kill active cursor " << cursorId; ASSERT_THROWS_WHAT(CollectionCursorCache::eraseCursorGlobal(&_txn, cursorId), MsgAssertionException, expectedAssertion); - ctx.commit(); } // Verify that the remaining document is read from the cursor. diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index c013594f4da..7d036168084 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -77,6 +77,7 @@ namespace ReplTests { createOplog(&_txn); Client::WriteContext ctx(&_txn, ns()); + WriteUnitOfWork wuow(&_txn); Collection* c = ctx.ctx().db()->getCollection(&_txn, ns()); if ( ! c ) { @@ -84,7 +85,7 @@ namespace ReplTests { } ASSERT(c->getIndexCatalog()->haveIdIndex(&_txn)); - ctx.commit(); + wuow.commit(); } ~Base() { try { diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 9a3248a8578..a992d044be4 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -441,8 +441,15 @@ namespace mongo { invariant( _deleteNotifyExec.get() == NULL ); WorkingSet* ws = new WorkingSet(); DeleteNotificationStage* dns = new DeleteNotificationStage(); + PlanExecutor* deleteNotifyExec; // Takes ownership of 'ws' and 'dns'. - PlanExecutor* deleteNotifyExec = new PlanExecutor(txn, ws, dns, collection); + Status execStatus = PlanExecutor::make(txn, + ws, + dns, + collection, + PlanExecutor::YIELD_MANUAL, + &deleteNotifyExec); + invariant(execStatus.isOK()); deleteNotifyExec->registerExec(); _deleteNotifyExec.reset(deleteNotifyExec); @@ -698,7 +705,6 @@ namespace mongo { return NULL; } virtual std::vector<PlanStage*> getChildren() const { - invariant( false ); vector<PlanStage*> empty; return empty; } @@ -1730,13 +1736,14 @@ namespace mongo { if ( entry["options"].isABSONObj() ) options = entry["options"].Obj(); + WriteUnitOfWork wuow(txn); Status status = userCreateNS( txn, db, ns, options, true, false ); if ( !status.isOK() ) { warning() << "failed to create collection [" << ns << "] " << " with options " << options << ": " << status; } + wuow.commit(); } - ctx.commit(); } { @@ -1912,7 +1919,6 @@ namespace mongo { } Helpers::upsert( txn, ns, o, true ); - cx.commit(); } thisTime++; numCloned++; @@ -2121,7 +2127,6 @@ namespace mongo { BSONObjIterator i( xfer["deleted"].Obj() ); while ( i.more() ) { Lock::CollectionLock clk(txn->lockState(), ns, MODE_X); - WriteUnitOfWork wunit(txn); Client::Context ctx(txn, ns); BSONObj id = i.next().Obj(); @@ -2150,7 +2155,6 @@ namespace mongo { true /* fromMigrate */); *lastOpApplied = ctx.getClient()->getLastOp().asDate(); - wunit.commit(); didAnything = true; } } @@ -2180,7 +2184,6 @@ namespace mongo { Helpers::upsert( txn, ns , it , true ); *lastOpApplied = cx.ctx().getClient()->getLastOp().asDate(); - cx.commit(); didAnything = true; } } |