summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2014-10-21 10:24:24 -0400
committerDavid Storch <david.storch@10gen.com>2014-10-21 10:32:59 -0400
commit011dde7e6eac3b73cb1d2a7f004feee9bed99c46 (patch)
tree32b20bc3224627c93e5781ba72abb45a1f825b37 /src
parent0bee61d26e44e26c2678d550990a57ce488f222d (diff)
downloadmongo-011dde7e6eac3b73cb1d2a7f004feee9bed99c46.tar.gz
SERVER-15541 SERVER-15652 implement timing-based yielding
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/client.cpp5
-rw-r--r--src/mongo/db/client.h4
-rw-r--r--src/mongo/db/commands/count.cpp21
-rw-r--r--src/mongo/db/commands/distinct.cpp8
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp12
-rw-r--r--src/mongo/db/commands/find_cmd.cpp8
-rw-r--r--src/mongo/db/commands/geo_near_cmd.cpp3
-rw-r--r--src/mongo/db/commands/group.cpp14
-rw-r--r--src/mongo/db/commands/mr.cpp35
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp16
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp21
-rw-r--r--src/mongo/db/commands/repair_cursor.cpp21
-rw-r--r--src/mongo/db/commands/test_commands.cpp6
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp4
-rw-r--r--src/mongo/db/dbcommands.cpp3
-rw-r--r--src/mongo/db/dbhelpers.cpp11
-rw-r--r--src/mongo/db/exec/delete.cpp6
-rw-r--r--src/mongo/db/exec/multi_plan.cpp25
-rw-r--r--src/mongo/db/exec/multi_plan.h19
-rw-r--r--src/mongo/db/exec/stagedebug_cmd.cpp8
-rw-r--r--src/mongo/db/exec/subplan.cpp130
-rw-r--r--src/mongo/db/exec/subplan.h48
-rw-r--r--src/mongo/db/exec/update.cpp31
-rw-r--r--src/mongo/db/exec/update.h5
-rw-r--r--src/mongo/db/fts/fts_command_mongod.cpp6
-rw-r--r--src/mongo/db/ops/delete_executor.cpp24
-rw-r--r--src/mongo/db/ops/update_executor.cpp26
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp20
-rw-r--r--src/mongo/db/query/get_executor.cpp132
-rw-r--r--src/mongo/db/query/get_executor.h9
-rw-r--r--src/mongo/db/query/internal_plans.h32
-rw-r--r--src/mongo/db/query/new_find.cpp30
-rw-r--r--src/mongo/db/query/plan_executor.cpp191
-rw-r--r--src/mongo/db/query/plan_executor.h123
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp69
-rw-r--r--src/mongo/db/query/plan_yield_policy.h29
-rw-r--r--src/mongo/db/ttl.cpp1
-rw-r--r--src/mongo/dbtests/clienttests.cpp1
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp6
-rw-r--r--src/mongo/dbtests/executor_registry.cpp35
-rw-r--r--src/mongo/dbtests/indexcatalogtests.cpp9
-rw-r--r--src/mongo/dbtests/indexupdatetests.cpp4
-rw-r--r--src/mongo/dbtests/plan_ranking.cpp8
-rw-r--r--src/mongo/dbtests/query_multi_plan_runner.cpp22
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp31
-rw-r--r--src/mongo/dbtests/query_stage_and.cpp61
-rw-r--r--src/mongo/dbtests/query_stage_collscan.cpp31
-rw-r--r--src/mongo/dbtests/query_stage_count.cpp18
-rw-r--r--src/mongo/dbtests/query_stage_delete.cpp4
-rw-r--r--src/mongo/dbtests/query_stage_fetch.cpp10
-rw-r--r--src/mongo/dbtests/query_stage_keep.cpp5
-rw-r--r--src/mongo/dbtests/query_stage_merge_sort.cpp90
-rw-r--r--src/mongo/dbtests/query_stage_sort.cpp57
-rw-r--r--src/mongo/dbtests/query_stage_subplan.cpp10
-rw-r--r--src/mongo/dbtests/query_stage_tests.cpp20
-rw-r--r--src/mongo/dbtests/query_stage_update.cpp5
-rw-r--r--src/mongo/dbtests/querytests.cpp15
-rw-r--r--src/mongo/dbtests/repltests.cpp3
-rw-r--r--src/mongo/s/d_migrate.cpp17
59 files changed, 1031 insertions, 587 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 9b8bb0ae780..496dbae6cae 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -264,13 +264,8 @@ namespace mongo {
_nss(ns),
_dblk(opCtx->lockState(), _nss.db(), MODE_IX),
_collk(opCtx->lockState(), ns, MODE_IX),
- _wunit(opCtx),
_c(opCtx, ns) { }
- void Client::WriteContext::commit() {
- _wunit.commit();
- }
-
void Client::Context::checkNotStale() const {
switch ( _client->_curOp->getOp() ) {
case dbGetMore: // getMore's are special and should be handled else where
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index 8f82c5d67d3..7fb00403de9 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -183,9 +183,6 @@ namespace mongo {
public:
WriteContext(OperationContext* opCtx, const std::string& ns);
- /** Commit any writes done so far in this context. */
- void commit();
-
Database* db() const { return _c.db(); }
Collection* getCollection() const {
@@ -199,7 +196,6 @@ namespace mongo {
NamespaceString _nss;
Lock::DBLock _dblk;
Lock::CollectionLock _collk;
- WriteUnitOfWork _wunit;
Context _c;
};
diff --git a/src/mongo/db/commands/count.cpp b/src/mongo/db/commands/count.cpp
index cb3d6587a16..8ffb02baa13 100644
--- a/src/mongo/db/commands/count.cpp
+++ b/src/mongo/db/commands/count.cpp
@@ -83,13 +83,16 @@ namespace mongo {
Collection* collection = ctx.getCollection();
PlanExecutor* rawExec;
- Status getExecStatus = getExecutorCount(txn, collection, request, &rawExec);
+ Status getExecStatus = getExecutorCount(txn,
+ collection,
+ request,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec);
if (!getExecStatus.isOK()) {
return getExecStatus;
}
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
return Explain::explainStages(exec.get(), verbosity, out);
}
@@ -111,13 +114,16 @@ namespace mongo {
Collection* collection = ctx.getCollection();
PlanExecutor* rawExec;
- Status getExecStatus = getExecutorCount(txn, collection, request, &rawExec);
+ Status getExecStatus = getExecutorCount(txn,
+ collection,
+ request,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec);
if (!getExecStatus.isOK()) {
return appendCommandStatus(result, getExecStatus);
}
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
// Store the plan summary string in CurOp.
if (NULL != txn->getCurOp()) {
@@ -258,7 +264,11 @@ namespace mongo {
}
PlanExecutor* rawExec;
- Status getExecStatus = getExecutorCount(txn, collection, request, &rawExec);
+ Status getExecStatus = getExecutorCount(txn,
+ collection,
+ request,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec);
if (!getExecStatus.isOK()) {
err = getExecStatus.reason();
errCode = getExecStatus.code();
@@ -266,7 +276,6 @@ namespace mongo {
}
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
// Store the plan summary string in CurOp.
if (NULL != txn->getCurOp()) {
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index 1da82ccc605..16f2db2b2e9 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -110,7 +110,12 @@ namespace mongo {
}
PlanExecutor* rawExec;
- Status status = getExecutorDistinct(txn, collection, query, key, &rawExec);
+ Status status = getExecutorDistinct(txn,
+ collection,
+ query,
+ key,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec);
if (!status.isOK()) {
uasserted(17216, mongoutils::str::stream() << "Can't get runner for query "
<< query << ": " << status.toString());
@@ -118,7 +123,6 @@ namespace mongo {
}
auto_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
BSONObj obj;
PlanExecutor::ExecState state;
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 5b9fbdc7c3d..54e20e0f035 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -189,10 +189,14 @@ namespace mongo {
PlanExecutor* rawExec;
massert(17384, "Could not get plan executor for query " + queryOriginal.toString(),
- getExecutor(txn, collection, cq, &rawExec, QueryPlannerParams::DEFAULT).isOK());
+ getExecutor(txn,
+ collection,
+ cq,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec,
+ QueryPlannerParams::DEFAULT).isOK());
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
PlanExecutor::ExecState state;
if (PlanExecutor::ADVANCED == (state = exec->getNext(&doc, NULL))) {
@@ -200,6 +204,8 @@ namespace mongo {
}
}
+ WriteUnitOfWork wuow(txn);
+
BSONObj queryModified = queryOriginal;
if ( found && doc["_id"].type() && ! CanonicalQuery::isSimpleIdQuery( queryOriginal ) ) {
// we're going to re-write the query to be more efficient
@@ -335,7 +341,7 @@ namespace mongo {
}
}
- cx.commit();
+ wuow.commit();
return true;
}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 8f39e2624a6..d0194e9d1eb 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -103,7 +103,12 @@ namespace mongo {
options |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
- execStatus = getExecutor(txn, collection, cq.release(), &rawExec, options);
+ execStatus = getExecutor(txn,
+ collection,
+ cq.release(),
+ PlanExecutor::YIELD_AUTO,
+ &rawExec,
+ options);
}
if (!execStatus.isOK()) {
@@ -111,7 +116,6 @@ namespace mongo {
}
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
// Got the execution tree. Explain it.
return Explain::explainStages(exec.get(), verbosity, out);
diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp
index eb60bbdf09d..e47c0bac76d 100644
--- a/src/mongo/db/commands/geo_near_cmd.cpp
+++ b/src/mongo/db/commands/geo_near_cmd.cpp
@@ -182,13 +182,12 @@ namespace mongo {
}
PlanExecutor* rawExec;
- if (!getExecutor(txn, collection, cq, &rawExec, 0).isOK()) {
+ if (!getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, 0).isOK()) {
errmsg = "can't get query runner";
return false;
}
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
double totalDistance = 0;
BSONObjBuilder resultBuilder(result.subarrayStart("results"));
diff --git a/src/mongo/db/commands/group.cpp b/src/mongo/db/commands/group.cpp
index eb426a2504c..6f1a8a7153d 100644
--- a/src/mongo/db/commands/group.cpp
+++ b/src/mongo/db/commands/group.cpp
@@ -138,13 +138,16 @@ namespace mongo {
Collection* coll = ctx.getCollection();
PlanExecutor *rawPlanExecutor;
- Status getExecStatus = getExecutorGroup(txn, coll, groupRequest, &rawPlanExecutor);
+ Status getExecStatus = getExecutorGroup(txn,
+ coll,
+ groupRequest,
+ PlanExecutor::YIELD_AUTO,
+ &rawPlanExecutor);
if (!getExecStatus.isOK()) {
return appendCommandStatus(out, getExecStatus);
}
scoped_ptr<PlanExecutor> planExecutor(rawPlanExecutor);
- planExecutor->setYieldPolicy(PlanExecutor::YIELD_AUTO);
// Group executors return ADVANCED exactly once, with the entire group result.
BSONObj retval;
@@ -192,13 +195,16 @@ namespace mongo {
Collection* coll = ctx.getCollection();
PlanExecutor *rawPlanExecutor;
- Status getExecStatus = getExecutorGroup(txn, coll, groupRequest, &rawPlanExecutor);
+ Status getExecStatus = getExecutorGroup(txn,
+ coll,
+ groupRequest,
+ PlanExecutor::YIELD_AUTO,
+ &rawPlanExecutor);
if (!getExecStatus.isOK()) {
return getExecStatus;
}
scoped_ptr<PlanExecutor> planExecutor(rawPlanExecutor);
- planExecutor->setYieldPolicy(PlanExecutor::YIELD_AUTO);
return Explain::explainStages(planExecutor.get(), verbosity, out);
}
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 537543f5d75..a0814eb21dc 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -359,6 +359,7 @@ namespace mongo {
// Create the inc collection and make sure we have index on "0" key.
// Intentionally not replicating the inc collection to secondaries.
Client::WriteContext incCtx(_txn, _config.incLong);
+ WriteUnitOfWork wuow(_txn);
Collection* incColl = incCtx.getCollection();
invariant(!incColl);
@@ -376,7 +377,7 @@ namespace mongo {
uasserted( 17305 , str::stream() << "createIndex failed for mr incLong ns: " <<
_config.incLong << " err: " << status.code() );
}
- incCtx.commit();
+ wuow.commit();
}
vector<BSONObj> indexesToInsert;
@@ -384,6 +385,7 @@ namespace mongo {
{
// copy indexes into temporary storage
Client::WriteContext finalCtx(_txn, _config.outputOptions.finalNamespace);
+ WriteUnitOfWork wuow(_txn);
Collection* const finalColl = finalCtx.getCollection();
if ( finalColl ) {
IndexCatalog::IndexIterator ii =
@@ -406,12 +408,13 @@ namespace mongo {
indexesToInsert.push_back( b.obj() );
}
}
- finalCtx.commit();
+ wuow.commit();
}
{
// create temp collection and insert the indexes from temporary storage
Client::WriteContext tempCtx(_txn, _config.tempNamespace);
+ WriteUnitOfWork wuow(_txn);
uassert(ErrorCodes::NotMaster, "no longer master",
repl::getGlobalReplicationCoordinator()->
canAcceptWritesForDatabase(nsToDatabase(_config.tempNamespace.c_str())));
@@ -443,7 +446,7 @@ namespace mongo {
string logNs = nsToDatabase( _config.tempNamespace ) + ".system.indexes";
repl::logOp(_txn, "i", logNs.c_str(), *it);
}
- tempCtx.commit();
+ wuow.commit();
}
}
@@ -664,6 +667,7 @@ namespace mongo {
Client::WriteContext ctx(_txn, ns );
+ WriteUnitOfWork wuow(_txn);
uassert(ErrorCodes::NotMaster, "no longer master",
repl::getGlobalReplicationCoordinator()->
canAcceptWritesForDatabase(nsToDatabase(ns.c_str())));
@@ -680,7 +684,7 @@ namespace mongo {
coll->insertDocument( _txn, bo, true );
repl::logOp(_txn, "i", ns.c_str(), bo);
- ctx.commit();
+ wuow.commit();
}
/**
@@ -690,9 +694,10 @@ namespace mongo {
verify( _onDisk );
Client::WriteContext ctx(_txn, _config.incLong );
+ WriteUnitOfWork wuow(_txn);
Collection* coll = getCollectionOrUassert(ctx.db(), _config.incLong);
coll->insertDocument( _txn, o, true );
- ctx.commit();
+ wuow.commit();
}
State::State(OperationContext* txn, const Config& c) :
@@ -969,6 +974,7 @@ namespace mongo {
{
Client::WriteContext incCtx(_txn, _config.incLong );
+ WriteUnitOfWork wuow(_txn);
Collection* incColl = getCollectionOrUassert(incCtx.db(), _config.incLong );
bool foundIndex = false;
@@ -983,9 +989,9 @@ namespace mongo {
break;
}
}
- incCtx.commit();
verify( foundIndex );
+ wuow.commit();
}
scoped_ptr<AutoGetCollectionForRead> ctx(new AutoGetCollectionForRead(_txn, _config.incLong));
@@ -1009,11 +1015,14 @@ namespace mongo {
whereCallback).isOK());
PlanExecutor* rawExec;
- verify(getExecutor(_txn, getCollectionOrUassert(ctx->getDb(), _config.incLong),
- cq, &rawExec, QueryPlannerParams::NO_TABLE_SCAN).isOK());
+ verify(getExecutor(_txn,
+ getCollectionOrUassert(ctx->getDb(), _config.incLong),
+ cq,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec,
+ QueryPlannerParams::NO_TABLE_SCAN).isOK());
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
// iterate over all sorted objects
BSONObj o;
@@ -1357,15 +1366,17 @@ namespace mongo {
invariant(db);
PlanExecutor* rawExec;
- if (!getExecutor(txn, state.getCollectionOrUassert(db, config.ns),
- cq, &rawExec).isOK()) {
+ if (!getExecutor(txn,
+ state.getCollectionOrUassert(db, config.ns),
+ cq,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec).isOK()) {
uasserted(17239, "Can't get executor for query "
+ config.filter.toString());
return 0;
}
scoped_ptr<PlanExecutor> exec(rawExec);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
Timer mt;
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index a8f65aa63f9..2f4c8fe34d3 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -102,13 +102,17 @@ namespace mongo {
for ( size_t i = 0; i < numCursors; i++ ) {
WorkingSet* ws = new WorkingSet();
MultiIteratorStage* mis = new MultiIteratorStage(txn, ws, collection);
- // Takes ownership of 'ws' and 'mis'.
- auto_ptr<PlanExecutor> curExec(new PlanExecutor(txn, ws, mis, collection));
- // Each of the plan executors should yield automatically. We pass "false" to
- // indicate that 'curExec' should not register itself, as it will get registered
- // by ClientCursor instead.
- curExec->setYieldPolicy(PlanExecutor::YIELD_AUTO, false);
+ PlanExecutor* rawExec;
+ // Takes ownership of 'ws' and 'mis'.
+ Status execStatus = PlanExecutor::make(txn, ws, mis, collection,
+ PlanExecutor::YIELD_AUTO, &rawExec);
+ invariant(execStatus.isOK());
+ auto_ptr<PlanExecutor> curExec(rawExec);
+
+ // The PlanExecutor was registered on construction due to the YIELD_AUTO policy.
+ // We have to deregister it, as it will be registered with ClientCursor.
+ curExec->deregisterExec();
// Need to save state while yielding locks between now and newGetMore.
curExec->saveState();
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 0ae4f12ebe0..ed6ee3323c2 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -245,16 +245,25 @@ namespace mongo {
auto_ptr<WorkingSet> ws(new WorkingSet());
auto_ptr<PipelineProxyStage> proxy(
new PipelineProxyStage(pPipeline, input, ws.get()));
+ Status execStatus = Status::OK();
if (NULL == collection) {
- execHolder.reset(new PlanExecutor(txn, ws.release(), proxy.release(), ns));
+ execStatus = PlanExecutor::make(txn,
+ ws.release(),
+ proxy.release(),
+ ns,
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
}
else {
- execHolder.reset(new PlanExecutor(txn,
- ws.release(),
- proxy.release(),
- collection));
+ execStatus = PlanExecutor::make(txn,
+ ws.release(),
+ proxy.release(),
+ collection,
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
}
- exec = execHolder.get();
+ invariant(execStatus.isOK());
+ execHolder.reset(exec);
if (!collection && input) {
// If we don't have a collection, we won't be able to register any executors, so
diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp
index 92544df2a6d..67459d7a9c8 100644
--- a/src/mongo/db/commands/repair_cursor.cpp
+++ b/src/mongo/db/commands/repair_cursor.cpp
@@ -79,13 +79,20 @@ namespace mongo {
collection));
stage->addIterator(iter.release());
- std::auto_ptr<PlanExecutor> exec(new PlanExecutor(txn,
- ws.release(),
- stage.release(),
- collection));
-
- // 'exec' will be used in newGetMore(). Set its yield policy and save its state.
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ PlanExecutor* rawExec;
+ Status execStatus = PlanExecutor::make(txn,
+ ws.release(),
+ stage.release(),
+ collection,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec);
+ invariant(execStatus.isOK());
+ std::auto_ptr<PlanExecutor> exec(rawExec);
+
+ // 'exec' will be used in newGetMore(). It was automatically registered on construction
+ // due to the auto yield policy, so it could yield during plan selection. We deregister
+ // it now so that it can be registed with ClientCursor.
+ exec->deregisterExec();
exec->saveState();
// ClientCursors' constructor inserts them into a global map that manages their
diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp
index 405d2c77948..faf5965ec04 100644
--- a/src/mongo/db/commands/test_commands.cpp
+++ b/src/mongo/db/commands/test_commands.cpp
@@ -165,8 +165,9 @@ namespace mongo {
massert( 13418, "captrunc invalid n", PlanExecutor::ADVANCED == state);
}
}
+ WriteUnitOfWork wuow(txn);
collection->temp_cappedTruncateAfter( txn, end, inc );
- ctx.commit();
+ wuow.commit();
return true;
}
};
@@ -200,6 +201,7 @@ namespace mongo {
NamespaceString nss( dbname, coll );
Client::WriteContext ctx(txn, nss.ns() );
+ WriteUnitOfWork wuow(txn);
Database* db = ctx.db();
Collection* collection = ctx.getCollection();
massert( 13429, "emptycapped no such collection", collection );
@@ -214,7 +216,7 @@ namespace mongo {
if (!fromRepl)
repl::logOp(txn, "c",(dbname + ".$cmd").c_str(), cmdObj);
- ctx.commit();
+ wuow.commit();
return true;
}
};
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 6bc0bef54f1..5b2026649ca 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -634,13 +634,13 @@ namespace mongo {
// Step 1: Unlock all requests that are not-flush and not-global.
for (size_t i = 0; i < stateOut->locks.size(); ++i) {
for (size_t j = 0; j < stateOut->locks[i].recursiveCount; ++j) {
- unlock(stateOut->locks[i].resourceId);
+ invariant(unlock(stateOut->locks[i].resourceId));
}
}
// Step 2: Unlock the global lock.
for (size_t i = 0; i < stateOut->globalRecursiveCount; ++i) {
- unlock(resourceIdGlobal);
+ invariant(unlock(resourceIdGlobal));
}
// Step 3: Unlock flush. It's only acquired on the first global lock acquisition
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 499a6dfd965..d788f8b7970 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -645,7 +645,8 @@ namespace mongo {
}
PlanExecutor* rawExec;
- if (!getExecutor(txn, coll, cq, &rawExec, QueryPlannerParams::NO_TABLE_SCAN).isOK()) {
+ if (!getExecutor(txn, coll, cq, PlanExecutor::YIELD_MANUAL, &rawExec,
+ QueryPlannerParams::NO_TABLE_SCAN).isOK()) {
uasserted(17241, "Can't get executor for query " + query.toString());
return 0;
}
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 334e87895d8..2adb42d3d87 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -130,7 +130,12 @@ namespace mongo {
PlanExecutor* rawExec;
size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT;
massert(17245, "Could not get executor for query " + query.toString(),
- getExecutor(txn, collection, cq, &rawExec, options).isOK());
+ getExecutor(txn,
+ collection,
+ cq,
+ PlanExecutor::YIELD_MANUAL,
+ &rawExec,
+ options).isOK());
auto_ptr<PlanExecutor> exec(rawExec);
PlanExecutor::ExecState state;
@@ -408,6 +413,8 @@ namespace mongo {
verify(PlanExecutor::ADVANCED == state);
+ WriteUnitOfWork wuow(txn);
+
if ( onlyRemoveOrphanedDocs ) {
// Do a final check in the write lock to make absolutely sure that our
// collection hasn't been modified in a way that invalidates our migration
@@ -446,7 +453,7 @@ namespace mongo {
collection->deleteDocument( txn, rloc, false, false, &deletedId );
// The above throws on failure, and so is not logged
repl::logOp(txn, "d", ns.c_str(), deletedId, 0, 0, fromMigrate);
- ctx.commit();
+ wuow.commit();
numDeleted++;
}
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp
index ddfeeaaffd0..f4633af95a8 100644
--- a/src/mongo/db/exec/delete.cpp
+++ b/src/mongo/db/exec/delete.cpp
@@ -164,6 +164,12 @@ namespace mongo {
_txn = opCtx;
++_commonStats.unyields;
_child->restoreState(opCtx);
+
+ const NamespaceString& ns(_collection->ns());
+ massert(28537,
+ str::stream() << "Demoted from primary while removing from " << ns.ns(),
+ !_params.shouldCallLogOp ||
+ repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(ns.db()));
}
void DeleteStage::invalidate(const DiskLoc& dl, InvalidationType type) {
diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp
index 3db8e3e4dd7..686555d15fd 100644
--- a/src/mongo/db/exec/multi_plan.cpp
+++ b/src/mongo/db/exec/multi_plan.cpp
@@ -148,7 +148,7 @@ namespace mongo {
return state;
}
- void MultiPlanStage::pickBestPlan() {
+ Status MultiPlanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
// Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
// execution work that happens here, so this is needed for the time accounting to
// make sense.
@@ -182,11 +182,28 @@ namespace mongo {
// Work the plans, stopping when a plan hits EOF or returns some
// fixed number of results.
for (size_t ix = 0; ix < numWorks; ++ix) {
+ // Yield, if it's time to yield.
+ if (NULL != yieldPolicy && yieldPolicy->shouldYield()) {
+ bool alive = yieldPolicy->yield();
+ if (!alive) {
+ _failure = true;
+ Status failStat(ErrorCodes::OperationFailed,
+ "PlanExecutor killed during plan selection");
+ _statusMemberId = WorkingSetCommon::allocateStatusMember(_candidates[0].ws,
+ failStat);
+ return failStat;
+ }
+ }
+
bool moreToDo = workAllPlans(numResults);
if (!moreToDo) { break; }
}
- if (_failure) { return; }
+ if (_failure) {
+ invariant(WorkingSet::INVALID_ID != _statusMemberId);
+ WorkingSetMember* member = _candidates[0].ws->get(_statusMemberId);
+ return WorkingSetCommon::getMemberStatus(*member);
+ }
// After picking best plan, ranking will own plan stats from
// candidate solutions (winner and losers).
@@ -290,6 +307,8 @@ namespace mongo {
_collection->infoCache()->getPlanCache()->add(*_query, solutions, ranking.release());
}
}
+
+ return Status::OK();
}
vector<PlanStageStats*> MultiPlanStage::generateCandidateStats() {
@@ -341,8 +360,6 @@ namespace mongo {
// Propagate most recent seen failure to parent.
if (PlanStage::FAILURE == state) {
- BSONObj objOut;
- WorkingSetCommon::getStatusMemberObject(*candidate.ws, id, &objOut);
_statusMemberId = id;
}
diff --git a/src/mongo/db/exec/multi_plan.h b/src/mongo/db/exec/multi_plan.h
index 964d71443d0..956969eb2d2 100644
--- a/src/mongo/db/exec/multi_plan.h
+++ b/src/mongo/db/exec/multi_plan.h
@@ -36,6 +36,7 @@
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/query_solution.h"
#include "mongo/db/query/plan_ranker.h"
+#include "mongo/db/query/plan_yield_policy.h"
namespace mongo {
@@ -81,9 +82,15 @@ namespace mongo {
/**
* Runs all plans added by addPlan, ranks them, and picks a best.
- * All further calls to getNext(...) will return results from the best plan.
+ * All further calls to work(...) will return results from the best plan.
+ *
+ * If 'yieldPolicy' is non-NULL, then all locks may be yielded in between round-robin
+ * works of the candidate plans. By default, 'yieldPolicy' is NULL and no yielding will
+ * take place.
+ *
+ * Returns a non-OK status if the plan was killed during yield.
*/
- void pickBestPlan();
+ Status pickBestPlan(PlanYieldPolicy* yieldPolicy);
/** Return true if a best plan has been chosen */
bool bestPlanChosen() const;
@@ -156,20 +163,24 @@ namespace mongo {
// uses -1 / kNoSuchPlan when best plan is not (yet) known
int _backupPlanIdx;
- // Did all plans fail while we were running them? Note that one plan can fail
+ // Set if this MultiPlanStage cannot continue, and the query must fail. This can happen in
+ // two ways. The first is that all candidate plans fail. Note that one plan can fail
// during normal execution of the plan competition. Here is an example:
//
// Plan 1: collection scan with sort. Sort runs out of memory.
// Plan 2: ixscan that provides sort. Won't run out of memory.
//
// We want to choose plan 2 even if plan 1 fails.
+ //
+ // The second way for failure to occur is that the execution of this query is killed during
+ // a yield, by some concurrent event such as a collection drop.
bool _failure;
// If everything fails during the plan competition, we can't pick one.
size_t _failureCount;
// if pickBestPlan fails, this is set to the wsid of the statusMember
- // returned by ::work()
+ // returned by ::work()
WorkingSetID _statusMemberId;
// Stats
diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp
index 75bcfb3dce9..e39024a5205 100644
--- a/src/mongo/db/exec/stagedebug_cmd.cpp
+++ b/src/mongo/db/exec/stagedebug_cmd.cpp
@@ -147,11 +147,15 @@ namespace mongo {
// TODO: Do we want to do this for the user? I think so.
PlanStage* rootFetch = new FetchStage(txn, ws.get(), userRoot, NULL, collection);
- PlanExecutor runner(txn, ws.release(), rootFetch, collection);
+ PlanExecutor* rawExec;
+ Status execStatus = PlanExecutor::make(txn, ws.release(), rootFetch, collection,
+ PlanExecutor::YIELD_MANUAL, &rawExec);
+ fassert(28536, execStatus);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
BSONArrayBuilder resultBuilder(result.subarrayStart("results"));
- for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) {
+ for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) {
resultBuilder.append(obj);
}
diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp
index 25af192a464..3159734c065 100644
--- a/src/mongo/db/exec/subplan.cpp
+++ b/src/mongo/db/exec/subplan.cpp
@@ -77,31 +77,6 @@ namespace mongo {
}
// static
- Status SubplanStage::make(OperationContext* txn,
- Collection* collection,
- WorkingSet* ws,
- const QueryPlannerParams& params,
- CanonicalQuery* cq,
- SubplanStage** out) {
- auto_ptr<SubplanStage> autoStage(new SubplanStage(txn, collection, ws, params, cq));
- // Plan each branch of the $or.
- Status planningStatus = autoStage->planSubqueries();
- if (!planningStatus.isOK()) {
- return planningStatus;
- }
-
- // Use the multi plan stage to select a winning plan for each branch, and then
- // construct the overall winning plan from the resulting index tags.
- Status multiPlanStatus = autoStage->pickBestPlan();
- if (!multiPlanStatus.isOK()) {
- return multiPlanStatus;
- }
-
- *out = autoStage.release();
- return Status::OK();
- }
-
- // static
bool SubplanStage::canUseSubplanning(const CanonicalQuery& query) {
const LiteParsedQuery& lpq = query.getParsed();
const MatchExpression* expr = query.root();
@@ -213,11 +188,7 @@ namespace mongo {
return Status::OK();
}
- Status SubplanStage::pickBestPlan() {
- // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
- // work that happens here, so this is needed for the time accounting to make sense.
- ScopedTimer timer(&_commonStats.executionTimeMillis);
-
+ Status SubplanStage::choosePlanForSubqueries(PlanYieldPolicy* yieldPolicy) {
// This is what we annotate with the index selections and then turn into a solution.
auto_ptr<OrMatchExpression> theOr(
static_cast<OrMatchExpression*>(_query->root()->shallowClone()));
@@ -277,9 +248,8 @@ namespace mongo {
_ws->clear();
- auto_ptr<MultiPlanStage> multiPlanStage(new MultiPlanStage(_txn,
- _collection,
- orChildCQ.get()));
+ _child.reset(new MultiPlanStage(_txn, _collection, orChildCQ.get()));
+ MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_child.get());
// Dump all the solutions into the MPR.
for (size_t ix = 0; ix < solutions.size(); ++ix) {
@@ -294,7 +264,11 @@ namespace mongo {
multiPlanStage->addPlan(solutions[ix], nextPlanRoot, _ws);
}
- multiPlanStage->pickBestPlan();
+ Status planSelectStat = multiPlanStage->pickBestPlan(yieldPolicy);
+ if (!planSelectStat.isOK()) {
+ return planSelectStat;
+ }
+
if (!multiPlanStage->bestPlanChosen()) {
mongoutils::str::stream ss;
ss << "Failed to pick best plan for subchild "
@@ -303,7 +277,6 @@ namespace mongo {
}
QuerySolution* bestSoln = multiPlanStage->bestSolution();
- _child.reset(multiPlanStage.release());
// Check that we have good cache data. For example, we don't cache things
// for 2d indices.
@@ -372,12 +345,13 @@ namespace mongo {
// with stats obtained in the same fashion as a competitive ranking would have obtained
// them.
_ws->clear();
- auto_ptr<MultiPlanStage> multiPlanStage(new MultiPlanStage(_txn, _collection, _query));
+ _child.reset(new MultiPlanStage(_txn, _collection, _query));
+ MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_child.get());
PlanStage* root;
verify(StageBuilder::build(_txn, _collection, *soln, _ws, &root));
multiPlanStage->addPlan(soln, root, _ws); // Takes ownership first two arguments.
- multiPlanStage->pickBestPlan();
+ multiPlanStage->pickBestPlan(yieldPolicy);
if (! multiPlanStage->bestPlanChosen()) {
mongoutils::str::stream ss;
ss << "Failed to pick best plan for subchild "
@@ -385,7 +359,87 @@ namespace mongo {
return Status(ErrorCodes::BadValue, ss);
}
- _child.reset(multiPlanStage.release());
+ return Status::OK();
+ }
+
+ Status SubplanStage::choosePlanWholeQuery(PlanYieldPolicy* yieldPolicy) {
+ // Clear out the working set. We'll start with a fresh working set.
+ _ws->clear();
+
+ // Use the query planning module to plan the whole query.
+ vector<QuerySolution*> solutions;
+ Status status = QueryPlanner::plan(*_query, _plannerParams, &solutions);
+ if (!status.isOK()) {
+ return Status(ErrorCodes::BadValue,
+ "error processing query: " + _query->toString() +
+ " planner returned error: " + status.reason());
+ }
+
+ // We cannot figure out how to answer the query. Perhaps it requires an index
+ // we do not have?
+ if (0 == solutions.size()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream()
+ << "error processing query: "
+ << _query->toString()
+ << " No query solutions");
+ }
+
+ if (1 == solutions.size()) {
+ PlanStage* root;
+ // Only one possible plan. Run it. Build the stages from the solution.
+ verify(StageBuilder::build(_txn, _collection, *solutions[0], _ws, &root));
+ _child.reset(root);
+ return Status::OK();
+ }
+ else {
+ // Many solutions. Create a MultiPlanStage to pick the best, update the cache,
+ // and so on. The working set will be shared by all candidate plans.
+ _child.reset(new MultiPlanStage(_txn, _collection, _query));
+ MultiPlanStage* multiPlanStage = static_cast<MultiPlanStage*>(_child.get());
+
+ for (size_t ix = 0; ix < solutions.size(); ++ix) {
+ if (solutions[ix]->cacheData.get()) {
+ solutions[ix]->cacheData->indexFilterApplied =
+ _plannerParams.indexFiltersApplied;
+ }
+
+ // version of StageBuild::build when WorkingSet is shared
+ PlanStage* nextPlanRoot;
+ verify(StageBuilder::build(_txn, _collection, *solutions[ix], _ws,
+ &nextPlanRoot));
+
+ // Owns none of the arguments
+ multiPlanStage->addPlan(solutions[ix], nextPlanRoot, _ws);
+ }
+
+ // Delegate the the MultiPlanStage's plan selection facility.
+ Status planSelectStat = multiPlanStage->pickBestPlan(yieldPolicy);
+ if (!planSelectStat.isOK()) {
+ return planSelectStat;
+ }
+
+ return Status::OK();
+ }
+ }
+
+ Status SubplanStage::pickBestPlan(PlanYieldPolicy* yieldPolicy) {
+ // Adds the amount of time taken by pickBestPlan() to executionTimeMillis. There's lots of
+ // work that happens here, so this is needed for the time accounting to make sense.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ // Plan each branch of the $or.
+ Status subplanningStatus = planSubqueries();
+ if (!subplanningStatus.isOK()) {
+ return choosePlanWholeQuery(yieldPolicy);
+ }
+
+ // Use the multi plan stage to select a winning plan for each branch, and then construct
+ // the overall winning plan from the resulting index tags.
+ Status subplanSelectStat = choosePlanForSubqueries(yieldPolicy);
+ if (!subplanSelectStat.isOK()) {
+ return choosePlanWholeQuery(yieldPolicy);
+ }
return Status::OK();
}
diff --git a/src/mongo/db/exec/subplan.h b/src/mongo/db/exec/subplan.h
index 477d6853194..a446eb746df 100644
--- a/src/mongo/db/exec/subplan.h
+++ b/src/mongo/db/exec/subplan.h
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/db/diskloc.h"
#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/query/query_planner_params.h"
#include "mongo/db/query/query_solution.h"
@@ -51,18 +52,11 @@ namespace mongo {
*/
class SubplanStage : public PlanStage {
public:
- /**
- * Used to create SubplanStage instances. The caller owns the instance
- * returned through 'out'.
- *
- * 'out' is valid only if an OK status is returned.
- */
- static Status make(OperationContext* txn,
- Collection* collection,
- WorkingSet* ws,
- const QueryPlannerParams& params,
- CanonicalQuery* cq,
- SubplanStage** out);
+ SubplanStage(OperationContext* txn,
+ Collection* collection,
+ WorkingSet* ws,
+ const QueryPlannerParams& params,
+ CanonicalQuery* cq);
virtual ~SubplanStage();
@@ -87,13 +81,22 @@ namespace mongo {
static const char* kStageType;
- private:
- SubplanStage(OperationContext* txn,
- Collection* collection,
- WorkingSet* ws,
- const QueryPlannerParams& params,
- CanonicalQuery* cq);
+ /**
+ * Selects a plan using subplanning. First uses the query planning results from
+ * planSubqueries() and the multi plan stage to select the best plan for each branch.
+ *
+ * If this effort fails, then falls back on planning the whole query normally rather
+ * then planning $or branches independently.
+ *
+ * If 'yieldPolicy' is non-NULL, then all locks may be yielded in between round-robin
+ * works of the candidate plans. By default, 'yieldPolicy' is NULL and no yielding will
+ * take place.
+ *
+ * Returns a non-OK status if the plan was killed during yield or if planning fails.
+ */
+ Status pickBestPlan(PlanYieldPolicy* yieldPolicy);
+ private:
/**
* Plan each branch of the $or independently, and store the resulting
* lists of query solutions in '_solutions'.
@@ -107,8 +110,15 @@ namespace mongo {
/**
* Uses the query planning results from planSubqueries() and the multi plan stage
* to select the best plan for each branch.
+ *
+ * Helper for pickBestPlan().
+ */
+ Status choosePlanForSubqueries(PlanYieldPolicy* yieldPolicy);
+
+ /**
+ * Used as a fallback if subplanning fails. Helper for pickBestPlan().
*/
- Status pickBestPlan();
+ Status choosePlanWholeQuery(PlanYieldPolicy* yieldPolicy);
// transactional context for read locks. Not owned by us
OperationContext* _txn;
diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp
index 7e16ebd9b7a..c3a44531f7b 100644
--- a/src/mongo/db/exec/update.cpp
+++ b/src/mongo/db/exec/update.cpp
@@ -789,9 +789,40 @@ namespace mongo {
_child->saveState();
}
+ Status UpdateStage::restoreUpdateState(OperationContext* opCtx) {
+ const UpdateRequest& request = *_params.request;
+ const NamespaceString& nsString(request.getNamespaceString());
+
+ // We may have stepped down during the yield.
+ if (request.shouldCallLogOp() &&
+ !repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsString.db())) {
+ return Status(ErrorCodes::NotMaster,
+ str::stream() << "Demoted from primary while performing update on "
+ << nsString.ns());
+ }
+
+ if (request.getLifecycle()) {
+ UpdateLifecycle* lifecycle = request.getLifecycle();
+ lifecycle->setCollection(_collection);
+
+ if (!lifecycle->canContinue()) {
+ return Status(ErrorCodes::IllegalOperation,
+ "Update aborted due to invalid state transitions after yield.",
+ 17270);
+ }
+
+ _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(opCtx));
+ }
+
+ return Status::OK();
+ }
+
void UpdateStage::restoreState(OperationContext* opCtx) {
++_commonStats.unyields;
+ // Restore our child.
_child->restoreState(opCtx);
+ // Restore self.
+ uassertStatusOK(restoreUpdateState(opCtx));
}
void UpdateStage::invalidate(const DiskLoc& dl, InvalidationType type) {
diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h
index 862074e4001..b543ebb5fdf 100644
--- a/src/mongo/db/exec/update.h
+++ b/src/mongo/db/exec/update.h
@@ -125,6 +125,11 @@ namespace mongo {
*/
bool needInsert();
+ /**
+ * Helper for restoring the state of this update.
+ */
+ Status restoreUpdateState(OperationContext* opCtx);
+
UpdateStageParams _params;
// Not owned by us.
diff --git a/src/mongo/db/fts/fts_command_mongod.cpp b/src/mongo/db/fts/fts_command_mongod.cpp
index 08396d50d9c..e44e9327234 100644
--- a/src/mongo/db/fts/fts_command_mongod.cpp
+++ b/src/mongo/db/fts/fts_command_mongod.cpp
@@ -113,7 +113,11 @@ namespace mongo {
}
PlanExecutor* rawExec;
- Status getExecStatus = getExecutor(txn, ctx.getCollection(), cq, &rawExec);
+ Status getExecStatus = getExecutor(txn,
+ ctx.getCollection(),
+ cq,
+ PlanExecutor::YIELD_MANUAL,
+ &rawExec);
if (!getExecStatus.isOK()) {
errmsg = getExecStatus.reason();
return false;
diff --git a/src/mongo/db/ops/delete_executor.cpp b/src/mongo/db/ops/delete_executor.cpp
index 8e39be7f7a1..aca3a86bb97 100644
--- a/src/mongo/db/ops/delete_executor.cpp
+++ b/src/mongo/db/ops/delete_executor.cpp
@@ -126,6 +126,16 @@ namespace mongo {
str::stream() << "Not primary while removing from " << ns.ns());
}
+ // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set
+ // a manual yield policy.
+ const bool canYield = !_request->isGod() && (
+ _canonicalQuery.get() ?
+ !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) :
+ !LiteParsedQuery::isQueryIsolated(_request->getQuery()));
+
+ PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO :
+ PlanExecutor::YIELD_MANUAL;
+
PlanExecutor* rawExec;
Status getExecStatus = Status::OK();
if (_canonicalQuery.get()) {
@@ -137,6 +147,7 @@ namespace mongo {
_request->shouldCallLogOp(),
_request->isFromMigrate(),
_request->isExplain(),
+ policy,
&rawExec);
}
else {
@@ -149,6 +160,7 @@ namespace mongo {
_request->shouldCallLogOp(),
_request->isFromMigrate(),
_request->isExplain(),
+ policy,
&rawExec);
}
@@ -159,18 +171,6 @@ namespace mongo {
invariant(rawExec);
_exec.reset(rawExec);
- // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set
- // a manual yield policy.
- const bool canYield = !_request->isGod() && (
- _canonicalQuery.get() ?
- !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) :
- !LiteParsedQuery::isQueryIsolated(_request->getQuery()));
-
- PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO :
- PlanExecutor::YIELD_MANUAL;
-
- _exec->setYieldPolicy(policy);
-
return Status::OK();
}
diff --git a/src/mongo/db/ops/update_executor.cpp b/src/mongo/db/ops/update_executor.cpp
index 3673382b9bd..c5de729a45f 100644
--- a/src/mongo/db/ops/update_executor.cpp
+++ b/src/mongo/db/ops/update_executor.cpp
@@ -160,18 +160,28 @@ namespace mongo {
_driver.refreshIndexKeys(lifecycle->getIndexKeys(_request->getOpCtx()));
}
+ // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set
+ // a manual yield policy.
+ const bool canYield = !_request->isGod() && (
+ _canonicalQuery.get() ?
+ !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) :
+ !LiteParsedQuery::isQueryIsolated(_request->getQuery()));
+
+ PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO :
+ PlanExecutor::YIELD_MANUAL;
+
PlanExecutor* rawExec = NULL;
Status getExecStatus = Status::OK();
if (_canonicalQuery.get()) {
// This is the regular path for when we have a CanonicalQuery.
getExecStatus = getExecutorUpdate(_request->getOpCtx(), db, _canonicalQuery.release(),
- _request, &_driver, _opDebug, &rawExec);
+ _request, &_driver, _opDebug, policy, &rawExec);
}
else {
// This is the idhack fast-path for getting a PlanExecutor without doing the work
// to create a CanonicalQuery.
getExecStatus = getExecutorUpdate(_request->getOpCtx(), db, nsString.ns(), _request,
- &_driver, _opDebug, &rawExec);
+ &_driver, _opDebug, policy, &rawExec);
}
if (!getExecStatus.isOK()) {
@@ -181,18 +191,6 @@ namespace mongo {
invariant(rawExec);
_exec.reset(rawExec);
- // If yielding is allowed for this plan, then set an auto yield policy. Otherwise set
- // a manual yield policy.
- const bool canYield = !_request->isGod() && (
- _canonicalQuery.get() ?
- !QueryPlannerCommon::hasNode(_canonicalQuery->root(), MatchExpression::ATOMIC) :
- !LiteParsedQuery::isQueryIsolated(_request->getQuery()));
-
- PlanExecutor::YieldPolicy policy = canYield ? PlanExecutor::YIELD_AUTO :
- PlanExecutor::YIELD_MANUAL;
-
- _exec->setYieldPolicy(policy);
-
return Status::OK();
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 48f4e87bd87..dd0a57d8f8e 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -177,8 +177,14 @@ namespace {
projectionForQuery,
&cq,
whereCallback);
+
PlanExecutor* rawExec;
- if (status.isOK() && getExecutor(txn, collection, cq, &rawExec, runnerOptions).isOK()) {
+ if (status.isOK() && getExecutor(txn,
+ collection,
+ cq,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec,
+ runnerOptions).isOK()) {
// success: The PlanExecutor will handle sorting for us using an index.
exec.reset(rawExec);
sortInRunner = true;
@@ -203,15 +209,19 @@ namespace {
whereCallback));
PlanExecutor* rawExec;
- uassertStatusOK(getExecutor(txn, collection, cq, &rawExec, runnerOptions));
+ uassertStatusOK(getExecutor(txn,
+ collection,
+ cq,
+ PlanExecutor::YIELD_AUTO,
+ &rawExec,
+ runnerOptions));
exec.reset(rawExec);
}
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
- // pass "false" here to indicate that the PlanExecutor should not register itself: instead
- // the output PlanExecutor will get registered with a ClientCursor.
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO, false);
+ // deregister the PlanExecutor so that it can be registered with ClientCursor.
+ exec->deregisterExec();
exec->saveState();
// Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 26db3998308..8c474c0f948 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -301,19 +301,10 @@ namespace mongo {
&& SubplanStage::canUseSubplanning(*canonicalQuery)) {
QLOG() << "Running query as sub-queries: " << canonicalQuery->toStringShort();
+ LOG(2) << "Running query as sub-queries: " << canonicalQuery->toStringShort();
- SubplanStage* subPlan;
- Status subplanStatus = SubplanStage::make(opCtx, collection, ws, plannerParams,
- canonicalQuery, &subPlan);
- if (subplanStatus.isOK()) {
- LOG(2) << "Running query as sub-queries: " << canonicalQuery->toStringShort();
- *rootOut = subPlan;
- return Status::OK();
- }
- else {
- QLOG() << "Subplanner: " << subplanStatus.reason();
- // Fall back on non-subplan execution.
- }
+ *rootOut = new SubplanStage(opCtx, collection, ws, plannerParams, canonicalQuery);
+ return Status::OK();
}
vector<QuerySolution*> solutions;
@@ -388,9 +379,6 @@ namespace mongo {
multiPlanStage->addPlan(solutions[ix], nextPlanRoot, ws);
}
- // Do the plan selection up front.
- multiPlanStage->pickBestPlan();
-
*rootOut = multiPlanStage;
return Status::OK();
}
@@ -401,6 +389,7 @@ namespace mongo {
Status getExecutor(OperationContext* txn,
Collection* collection,
CanonicalQuery* rawCanonicalQuery,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out,
size_t plannerOptions) {
auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery);
@@ -415,15 +404,15 @@ namespace mongo {
invariant(root);
// We must have a tree of stages in order to have a valid plan executor, but the query
// solution may be null.
- *out = new PlanExecutor(txn, ws.release(), root, querySolution, canonicalQuery.release(),
- collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, querySolution, canonicalQuery.release(),
+ collection, yieldPolicy, out);
}
Status getExecutor(OperationContext* txn,
Collection* collection,
const std::string& ns,
const BSONObj& unparsedQuery,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out,
size_t plannerOptions) {
if (!collection) {
@@ -431,8 +420,7 @@ namespace mongo {
<< " Using EOF stage: " << unparsedQuery.toString();
EOFStage* eofStage = new EOFStage();
WorkingSet* ws = new WorkingSet();
- *out = new PlanExecutor(txn, ws, eofStage, ns);
- return Status::OK();
+ return PlanExecutor::make(txn, ws, eofStage, ns, yieldPolicy, out);
}
if (!CanonicalQuery::isSimpleIdQuery(unparsedQuery) ||
@@ -446,7 +434,7 @@ namespace mongo {
return status;
// Takes ownership of 'cq'.
- return getExecutor(txn, collection, cq, out, plannerOptions);
+ return getExecutor(txn, collection, cq, yieldPolicy, out, plannerOptions);
}
LOG(2) << "Using idhack: " << unparsedQuery.toString();
@@ -460,8 +448,7 @@ namespace mongo {
root);
}
- *out = new PlanExecutor(txn, ws, root, collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws, root, collection, yieldPolicy, out);
}
//
@@ -475,6 +462,7 @@ namespace mongo {
bool shouldCallLogOp,
bool fromMigrate,
bool isExplain,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out) {
auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery);
auto_ptr<WorkingSet> ws(new WorkingSet());
@@ -495,9 +483,8 @@ namespace mongo {
root = new DeleteStage(txn, deleteStageParams, ws.get(), collection, root);
// We must have a tree of stages in order to have a valid plan executor, but the query
// solution may be null.
- *out = new PlanExecutor(txn, ws.release(), root, querySolution, canonicalQuery.release(),
- collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, querySolution, canonicalQuery.release(),
+ collection, yieldPolicy, out);
}
Status getExecutorDelete(OperationContext* txn,
@@ -508,6 +495,7 @@ namespace mongo {
bool shouldCallLogOp,
bool fromMigrate,
bool isExplain,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out) {
auto_ptr<WorkingSet> ws(new WorkingSet());
DeleteStageParams deleteStageParams;
@@ -523,8 +511,7 @@ namespace mongo {
<< " Using EOF stage: " << unparsedQuery.toString();
DeleteStage* deleteStage = new DeleteStage(txn, deleteStageParams, ws.get(), NULL,
new EOFStage());
- *out = new PlanExecutor(txn, ws.release(), deleteStage, ns);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), deleteStage, ns, yieldPolicy, out);
}
if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) &&
@@ -535,8 +522,7 @@ namespace mongo {
ws.get());
DeleteStage* root = new DeleteStage(txn, deleteStageParams, ws.get(), collection,
idHackStage);
- *out = new PlanExecutor(txn, ws.release(), root, collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, collection, yieldPolicy, out);
}
const WhereCallbackReal whereCallback(txn, collection->ns().db());
@@ -551,7 +537,7 @@ namespace mongo {
// Takes ownership of 'cq'.
return getExecutorDelete(txn, collection, cq, isMulti, shouldCallLogOp,
- fromMigrate, isExplain, out);
+ fromMigrate, isExplain, yieldPolicy, out);
}
//
@@ -564,6 +550,7 @@ namespace mongo {
const UpdateRequest* request,
UpdateDriver* driver,
OpDebug* opDebug,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut) {
auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery);
auto_ptr<WorkingSet> ws(new WorkingSet());
@@ -584,13 +571,14 @@ namespace mongo {
root = new UpdateStage(updateStageParams, ws.get(), db, root);
// We must have a tree of stages in order to have a valid plan executor, but the query
// solution may be null. Takes ownership of all args other than 'collection' and 'txn'
- *execOut = new PlanExecutor(txn,
- ws.release(),
- root,
- querySolution,
- canonicalQuery.release(),
- collection);
- return Status::OK();
+ return PlanExecutor::make(txn,
+ ws.release(),
+ root,
+ querySolution,
+ canonicalQuery.release(),
+ collection,
+ yieldPolicy,
+ execOut);
}
Status getExecutorUpdate(OperationContext* txn,
@@ -599,6 +587,7 @@ namespace mongo {
const UpdateRequest* request,
UpdateDriver* driver,
OpDebug* opDebug,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut) {
auto_ptr<WorkingSet> ws(new WorkingSet());
Collection* collection = db->getCollection(request->getOpCtx(),
@@ -614,8 +603,7 @@ namespace mongo {
<< " Using EOF stage: " << unparsedQuery.toString();
UpdateStage* updateStage = new UpdateStage(updateStageParams, ws.get(), db,
new EOFStage());
- *execOut = new PlanExecutor(txn, ws.release(), updateStage, ns);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), updateStage, ns, yieldPolicy, execOut);
}
if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) &&
@@ -625,8 +613,7 @@ namespace mongo {
PlanStage* idHackStage = new IDHackStage(txn, collection, unparsedQuery["_id"].wrap(),
ws.get());
UpdateStage* root = new UpdateStage(updateStageParams, ws.get(), db, idHackStage);
- *execOut = new PlanExecutor(txn, ws.release(), root, collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, collection, yieldPolicy, execOut);
}
const WhereCallbackReal whereCallback(txn, collection->ns().db());
@@ -640,7 +627,7 @@ namespace mongo {
return status;
// Takes ownership of 'cq'.
- return getExecutorUpdate(txn, db, cq, request, driver, opDebug, execOut);
+ return getExecutorUpdate(txn, db, cq, request, driver, opDebug, yieldPolicy, execOut);
}
//
@@ -650,6 +637,7 @@ namespace mongo {
Status getExecutorGroup(OperationContext* txn,
Collection* collection,
const GroupRequest& request,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut) {
if (!globalScriptEngine) {
return Status(ErrorCodes::BadValue, "server-side JavaScript execution is disabled");
@@ -664,8 +652,7 @@ namespace mongo {
// reporting machinery always assumes that the root stage for a group operation is a
// GroupStage, so in this case we put a GroupStage on top of an EOFStage.
root = new GroupStage(txn, request, ws.get(), new EOFStage());
- *execOut = new PlanExecutor(txn, ws.release(), root, request.ns);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, request.ns, yieldPolicy, execOut);
}
const NamespaceString nss(request.ns);
@@ -692,13 +679,14 @@ namespace mongo {
root = new GroupStage(txn, request, ws.get(), root);
// We must have a tree of stages in order to have a valid plan executor, but the query
// solution may be null. Takes ownership of all args other than 'collection'.
- *execOut = new PlanExecutor(txn,
- ws.release(),
- root,
- querySolution,
- canonicalQuery.release(),
- collection);
- return Status::OK();
+ return PlanExecutor::make(txn,
+ ws.release(),
+ root,
+ querySolution,
+ canonicalQuery.release(),
+ collection,
+ yieldPolicy,
+ execOut);
}
//
@@ -880,6 +868,7 @@ namespace mongo {
Status getExecutorCount(OperationContext* txn,
Collection* collection,
const CountRequest& request,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut) {
auto_ptr<WorkingSet> ws(new WorkingSet());
PlanStage* root;
@@ -890,8 +879,7 @@ namespace mongo {
// reporting machinery always assumes that the root stage for a count operation is
// a CountStage, so in this case we put a CountStage on top of an EOFStage.
root = new CountStage(txn, collection, request, ws.get(), new EOFStage());
- *execOut = new PlanExecutor(txn, ws.release(), root, request.ns);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, request.ns, yieldPolicy, execOut);
}
if (request.query.isEmpty()) {
@@ -899,8 +887,7 @@ namespace mongo {
// for its number of records. This is implemented by the CountStage, and we don't need
// to create a child for the count stage in this case.
root = new CountStage(txn, collection, request, ws.get(), NULL);
- *execOut = new PlanExecutor(txn, ws.release(), root, request.ns);
- return Status::OK();
+ return PlanExecutor::make(txn, ws.release(), root, request.ns, yieldPolicy, execOut);
}
const WhereCallbackReal whereCallback(txn, collection->ns().db());
@@ -937,14 +924,14 @@ namespace mongo {
root = new CountStage(txn, collection, request, ws.get(), root);
// We must have a tree of stages in order to have a valid plan executor, but the query
// solution may be NULL. Takes ownership of all args other than 'collection' and 'txn'
- *execOut = new PlanExecutor(txn,
- ws.release(),
- root,
- querySolution,
- cq.release(),
- collection);
-
- return Status::OK();
+ return PlanExecutor::make(txn,
+ ws.release(),
+ root,
+ querySolution,
+ cq.release(),
+ collection,
+ yieldPolicy,
+ execOut);
}
//
@@ -1004,6 +991,7 @@ namespace mongo {
Collection* collection,
const BSONObj& query,
const std::string& field,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out) {
// This should'a been checked by the distinct command.
invariant(collection);
@@ -1050,7 +1038,7 @@ namespace mongo {
}
// Takes ownership of 'cq'.
- return getExecutor(txn, collection, cq, out);
+ return getExecutor(txn, collection, cq, yieldPolicy, out);
}
//
@@ -1102,15 +1090,15 @@ namespace mongo {
<< ", planSummary: " << Explain::getPlanSummary(root);
// Takes ownership of its arguments (except for 'collection').
- *out = new PlanExecutor(txn, ws, root, soln, autoCq.release(), collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws, root, soln, autoCq.release(), collection,
+ yieldPolicy, out);
}
// See if we can answer the query in a fast-distinct compatible fashion.
vector<QuerySolution*> solutions;
status = QueryPlanner::plan(*cq, plannerParams, &solutions);
if (!status.isOK()) {
- return getExecutor(txn, collection, autoCq.release(), out);
+ return getExecutor(txn, collection, autoCq.release(), yieldPolicy, out);
}
// We look for a solution that has an ixscan we can turn into a distinctixscan
@@ -1131,9 +1119,9 @@ namespace mongo {
LOG(2) << "Using fast distinct: " << cq->toStringShort()
<< ", planSummary: " << Explain::getPlanSummary(root);
- // Takes ownership of its arguments (except for 'collection').
- *out = new PlanExecutor(txn, ws, root, solutions[i], autoCq.release(), collection);
- return Status::OK();
+ // Takes ownership of 'ws', 'root', 'solutions[i]', and 'autoCq'.
+ return PlanExecutor::make(txn, ws, root, solutions[i], autoCq.release(),
+ collection, yieldPolicy, out);
}
}
@@ -1153,7 +1141,7 @@ namespace mongo {
autoCq.reset(cq);
// Takes ownership of 'autoCq'.
- return getExecutor(txn, collection, autoCq.release(), out);
+ return getExecutor(txn, collection, autoCq.release(), yieldPolicy, out);
}
} // namespace mongo
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 60d015f93d0..86e8156dfd6 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -71,6 +71,7 @@ namespace mongo {
Status getExecutor(OperationContext* txn,
Collection* collection,
CanonicalQuery* rawCanonicalQuery,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out,
size_t plannerOptions = 0);
@@ -90,6 +91,7 @@ namespace mongo {
Collection* collection,
const std::string& ns,
const BSONObj& unparsedQuery,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out,
size_t plannerOptions = 0);
@@ -113,6 +115,7 @@ namespace mongo {
Collection* collection,
const BSONObj& query,
const std::string& field,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** out);
/*
@@ -125,6 +128,7 @@ namespace mongo {
Status getExecutorCount(OperationContext* txn,
Collection* collection,
const CountRequest& request,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut);
//
@@ -149,6 +153,7 @@ namespace mongo {
bool shouldCallLogOp,
bool fromMigrate,
bool isExplain,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut);
/**
@@ -168,6 +173,7 @@ namespace mongo {
bool shouldCallLogOp,
bool fromMigrate,
bool isExplain,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut);
//
@@ -191,6 +197,7 @@ namespace mongo {
const UpdateRequest* request,
UpdateDriver* driver,
OpDebug* opDebug,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut);
/**
@@ -210,6 +217,7 @@ namespace mongo {
const UpdateRequest* request,
UpdateDriver* driver,
OpDebug* opDebug,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut);
//
@@ -230,6 +238,7 @@ namespace mongo {
Status getExecutorGroup(OperationContext* txn,
Collection* collection,
const GroupRequest& request,
+ PlanExecutor::YieldPolicy yieldPolicy,
PlanExecutor** execOut);
} // namespace mongo
diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h
index c16aa251f2d..398c2ccbbb5 100644
--- a/src/mongo/db/query/internal_plans.h
+++ b/src/mongo/db/query/internal_plans.h
@@ -73,7 +73,16 @@ namespace mongo {
if (NULL == collection) {
EOFStage* eof = new EOFStage();
- return new PlanExecutor(txn, ws, eof, ns.toString());
+ PlanExecutor* exec;
+ // Takes ownership if 'ws' and 'eof'.
+ Status execStatus = PlanExecutor::make(txn,
+ ws,
+ eof,
+ ns.toString(),
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
+ invariant(execStatus.isOK());
+ return exec;
}
dassert( ns == collection->ns().ns() );
@@ -90,8 +99,16 @@ namespace mongo {
}
CollectionScan* cs = new CollectionScan(txn, params, ws, NULL);
+ PlanExecutor* exec;
// Takes ownership of 'ws' and 'cs'.
- return new PlanExecutor(txn, ws, cs, collection);
+ Status execStatus = PlanExecutor::make(txn,
+ ws,
+ cs,
+ collection,
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
+ invariant(execStatus.isOK());
+ return exec;
}
/**
@@ -123,7 +140,16 @@ namespace mongo {
root = new FetchStage(txn, ws, root, NULL, collection);
}
- return new PlanExecutor(txn, ws, root, collection);
+ PlanExecutor* exec;
+ // Takes ownership of 'ws' and 'root'.
+ Status execStatus = PlanExecutor::make(txn,
+ ws,
+ root,
+ collection,
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
+ invariant(execStatus.isOK());
+ return exec;
}
};
diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp
index e091193dc37..89d724c4c30 100644
--- a/src/mongo/db/query/new_find.cpp
+++ b/src/mongo/db/query/new_find.cpp
@@ -455,8 +455,12 @@ namespace mongo {
WorkingSet* oplogws = new WorkingSet();
OplogStart* stage = new OplogStart(txn, collection, tsExpr, oplogws);
+ PlanExecutor* rawExec;
// Takes ownership of ws and stage.
- scoped_ptr<PlanExecutor> exec(new PlanExecutor(txn, oplogws, stage, collection));
+ Status execStatus = PlanExecutor::make(txn, oplogws, stage, collection,
+ PlanExecutor::YIELD_AUTO, &rawExec);
+ invariant(execStatus.isOK());
+ scoped_ptr<PlanExecutor> exec(rawExec);
// The stage returns a DiskLoc of where to start.
DiskLoc startLoc;
@@ -464,7 +468,8 @@ namespace mongo {
// This is normal. The start of the oplog is the beginning of the collection.
if (PlanExecutor::IS_EOF == state) {
- return getExecutor(txn, collection, autoCq.release(), execOut);
+ return getExecutor(txn, collection, autoCq.release(), PlanExecutor::YIELD_AUTO,
+ execOut);
}
// This is not normal. An error was encountered.
@@ -485,8 +490,8 @@ namespace mongo {
WorkingSet* ws = new WorkingSet();
CollectionScan* cs = new CollectionScan(txn, params, ws, cq->root());
// Takes ownership of 'ws', 'cs', and 'cq'.
- *execOut = new PlanExecutor(txn, ws, cs, autoCq.release(), collection);
- return Status::OK();
+ return PlanExecutor::make(txn, ws, cs, autoCq.release(), collection,
+ PlanExecutor::YIELD_AUTO, execOut);
}
std::string newRunQuery(OperationContext* txn,
@@ -580,15 +585,7 @@ namespace mongo {
// Otherwise we go through the selection of which executor is most suited to the
// query + run-time context at hand.
Status status = Status::OK();
- if (collection == NULL) {
- LOG(2) << "Collection " << ns << " does not exist."
- << " Using EOF stage: " << cq->toStringShort();
- EOFStage* eofStage = new EOFStage();
- WorkingSet* ws = new WorkingSet();
- // Takes ownership of 'cq'.
- rawExec = new PlanExecutor(txn, ws, eofStage, cq, NULL);
- }
- else if (pq.getOptions().oplogReplay) {
+ if (NULL != collection && pq.getOptions().oplogReplay) {
// Takes ownership of 'cq'.
status = getOplogStartHack(txn, collection, cq, &rawExec);
}
@@ -598,7 +595,7 @@ namespace mongo {
options |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
// Takes ownership of 'cq'.
- status = getExecutor(txn, collection, cq, &rawExec, options);
+ status = getExecutor(txn, collection, cq, PlanExecutor::YIELD_AUTO, &rawExec, options);
}
if (!status.isOK()) {
@@ -609,11 +606,6 @@ namespace mongo {
verify(NULL != rawExec);
auto_ptr<PlanExecutor> exec(rawExec);
- // We want the PlanExecutor to yield automatically, but we handle registration of the
- // executor ourselves. We want to temporarily register the executor while we are generating
- // this batch of results, and then unregister and re-register with ClientCursor for getmore.
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
// If it's actually an explain, do the explain and return rather than falling through
// to the normal query execution loop.
if (pq.isExplain()) {
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 601091a567b..cca412cac8a 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -29,9 +29,11 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
+#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/query/plan_yield_policy.h"
@@ -40,46 +42,93 @@
namespace mongo {
- PlanExecutor::PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- const Collection* collection)
- : _opCtx(opCtx),
- _collection(collection),
- _cq(NULL),
- _workingSet(ws),
- _qs(NULL),
- _root(rt),
- _killed(false) {
- initNs();
+ namespace {
+
+ /**
+ * Retrieves the first stage of a given type from the plan tree, or NULL
+ * if no such stage is found.
+ */
+ PlanStage* getStageByType(PlanStage* root, StageType type) {
+ if (root->stageType() == type) {
+ return root;
+ }
+
+ vector<PlanStage*> children = root->getChildren();
+ for (size_t i = 0; i < children.size(); i++) {
+ PlanStage* result = getStageByType(children[i], type);
+ if (result) {
+ return result;
+ }
+ }
+
+ return NULL;
+ }
+
}
- PlanExecutor::PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- std::string ns)
- : _opCtx(opCtx),
- _collection(NULL),
- _cq(NULL),
- _workingSet(ws),
- _qs(NULL),
- _root(rt),
- _ns(ns),
- _killed(false) { }
+ // static
+ Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, collection, "", yieldPolicy, out);
+ }
- PlanExecutor::PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- CanonicalQuery* cq,
- const Collection* collection)
- : _opCtx(opCtx),
- _collection(collection),
- _cq(cq),
- _workingSet(ws),
- _qs(NULL),
- _root(rt),
- _killed(false) {
- initNs();
+ // static
+ Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ const std::string& ns,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, NULL, ns, yieldPolicy, out);
+ }
+
+ // static
+ Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, NULL, cq, collection, "", yieldPolicy, out);
+ }
+
+ // static
+ Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, qs, cq, collection, "", yieldPolicy, out);
+ }
+
+ // static
+ Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ const std::string& ns,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ std::auto_ptr<PlanExecutor> exec(new PlanExecutor(opCtx, ws, rt, qs, cq, collection, ns));
+
+ // Perform plan selection, if necessary.
+ Status status = exec->pickBestPlan(yieldPolicy);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ *out = exec.release();
+ return Status::OK();
}
PlanExecutor::PlanExecutor(OperationContext* opCtx,
@@ -87,18 +136,22 @@ namespace mongo {
PlanStage* rt,
QuerySolution* qs,
CanonicalQuery* cq,
- const Collection* collection)
+ const Collection* collection,
+ const std::string& ns)
: _opCtx(opCtx),
_collection(collection),
_cq(cq),
_workingSet(ws),
_qs(qs),
_root(rt),
+ _ns(ns),
_killed(false) {
- initNs();
- }
+ // We may still need to initialize _ns from either _collection or _cq.
+ if (!_ns.empty()) {
+ // We already have an _ns set, so there's nothing more to do.
+ return;
+ }
- void PlanExecutor::initNs() {
if (NULL != _collection) {
_ns = _collection->ns().ns();
}
@@ -108,6 +161,31 @@ namespace mongo {
}
}
+ Status PlanExecutor::pickBestPlan(YieldPolicy policy) {
+ // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and
+ // register it to receive notifications.
+ this->setYieldPolicy(policy);
+
+ // First check if we need to do subplanning.
+ PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN);
+ if (foundStage) {
+ SubplanStage* subplan = static_cast<SubplanStage*>(foundStage);
+ return subplan->pickBestPlan(_yieldPolicy.get());
+ }
+
+ // If we didn't have to do subplanning, we might still have to do regular
+ // multi plan selection.
+ foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN);
+ if (foundStage) {
+ MultiPlanStage* mps = static_cast<MultiPlanStage*>(foundStage);
+ return mps->pickBestPlan(_yieldPolicy.get());
+ }
+
+ // Either we chose a plan, or no plan selection was required. In both cases,
+ // our work has been successfully completed.
+ return Status::OK();
+ }
+
PlanExecutor::~PlanExecutor() { }
// static
@@ -180,6 +258,14 @@ namespace mongo {
if (_killed) { return PlanExecutor::DEAD; }
for (;;) {
+ // Yield if it's time to yield.
+ if (NULL != _yieldPolicy.get() && _yieldPolicy->shouldYield()) {
+ _yieldPolicy->yield();
+ if (_killed) {
+ return PlanExecutor::DEAD;
+ }
+ }
+
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState code = _root->work(&id);
@@ -282,19 +368,22 @@ namespace mongo {
}
Status PlanExecutor::executePlan() {
- WorkingSetID id = WorkingSet::INVALID_ID;
- PlanStage::StageState code = PlanStage::NEED_TIME;
- while (PlanStage::NEED_TIME == code || PlanStage::ADVANCED == code) {
- code = _root->work(&id);
+ BSONObj obj;
+ PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
+ while (PlanExecutor::ADVANCED == state) {
+ state = this->getNext(&obj, NULL);
}
- if (PlanStage::FAILURE == code) {
- BSONObj obj;
- WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &obj);
- return Status(ErrorCodes::BadValue,
- "Exec error: " + WorkingSetCommon::toStatusString(obj));
+ if (PlanExecutor::DEAD == state) {
+ return Status(ErrorCodes::OperationFailed, "Exec error: PlanExecutor killed");
+ }
+ else if (PlanExecutor::EXEC_ERROR == state) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "Exec error: "
+ << WorkingSetCommon::toStatusString(obj));
}
+ invariant(PlanExecutor::IS_EOF == state);
return Status::OK();
}
@@ -308,7 +397,7 @@ namespace mongo {
}
else {
invariant(PlanExecutor::YIELD_AUTO == policy);
- _yieldPolicy.reset(new PlanYieldPolicy());
+ _yieldPolicy.reset(new PlanYieldPolicy(this));
// Runners that yield automatically generally need to be registered so that
// after yielding, they receive notifications of events like deletions and
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index db616fc7281..f7da5783174 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -89,27 +89,35 @@ namespace mongo {
//
// 1. Register your PlanExecutor with ClientCursor. Registered executors are informed
// about DiskLoc deletions and namespace invalidation, as well as other important
- // events. Do this either by calling registerExec() on the executor. This can be done
- // once you get your executor, or could be done per-yield.
+ // events. Do this by calling registerExec() on the executor. Alternatively, this can
+ // be done per-yield (as described below).
//
- // 2. Call exec->saveState() before you yield.
+ // 2. Construct a PlanYieldPolicy 'policy', passing 'exec' to the constructor.
//
- // 3. Call Yield::yieldAllLocks(), passing in the executor's OperationContext*. This
- // causes the executor to give up its locks and block so that it goes to the back of
- // the scheduler's queue.
+ // 3. Call PlanYieldPolicy::yield() on 'policy'. If your PlanExecutor is not yet
+ // registered (because you want to register on a per-yield basis), then pass
+ // 'true' to yield().
//
- // 4. Call exec->restoreState() before using the executor again.
- //
- // 5. The next call to exec->getNext() may return DEAD.
- //
- // 6. Make sure the executor gets deregistered from ClientCursor. PlanExecutor does
- // this in an RAII fashion when it is destroyed, or you can explicity call
- // unregisterExec() on the PlanExecutor.
+ // 4. The call to yield() returns a boolean indicating whether or not 'exec' is
+ // still alove. If it is false, then 'exec' was killed during the yield and is
+ // no longer valid.
YIELD_MANUAL,
};
//
- // Constructors / destructor.
+ // Factory methods.
+ //
+ // On success, return a new PlanExecutor, owned by the caller, through 'out'.
+ //
+ // Passing YIELD_AUTO to any of these factories will construct a yielding runner which
+ // may yield in the following circumstances:
+ // 1) During plan selection inside the call to make().
+ // 2) On any call to getNext().
+ // 3) While executing the plan inside executePlan().
+ //
+ // The runner will also be automatically registered to receive notifications in the
+ // case of YIELD_AUTO, so no further calls to registerExec() or setYieldPolicy() are
+ // necessary.
//
/**
@@ -118,40 +126,48 @@ namespace mongo {
* Right now this is only for idhack updates which neither canonicalize
* nor go through normal planning.
*/
- PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- const Collection* collection);
+ static Status make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out);
/**
* Used when we have a NULL collection and no canonical query. In this case,
* we need to explicitly pass a namespace to the plan executor.
*/
- PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- std::string ns);
+ static Status make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ const std::string& ns,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out);
/**
* Used when there is a canonical query but no query solution (e.g. idhack
* queries, queries against a NULL collection, queries using the subplan stage).
*/
- PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- CanonicalQuery* cq,
- const Collection* collection);
+ static Status make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out);
/**
* The constructor for the normal case, when you have both a canonical query
* and a query solution.
*/
- PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- QuerySolution* qs,
- CanonicalQuery* cq,
- const Collection* collection);
+ static Status make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out);
~PlanExecutor();
@@ -228,6 +244,8 @@ namespace mongo {
* For read operations, objOut or dlOut are populated with another query result.
*
* For write operations, the return depends on the particulars of the write stage.
+ *
+ * If an AUTO_YIELD policy is set, then this method may yield.
*/
ExecState getNext(BSONObj* objOut, DiskLoc* dlOut);
@@ -242,6 +260,8 @@ namespace mongo {
/**
* Execute the plan to completion, throwing out the results. Used when you want to work the
* underlying tree without getting results back.
+ *
+ * If an AUTO_YIELD policy is set on this executor, then this will automatically yield.
*/
Status executePlan();
@@ -315,9 +335,42 @@ namespace mongo {
};
/**
- * Initialize the namespace using either the canonical query or the collection.
+ * New PlanExecutor instances are created with the static make() methods above.
+ */
+ PlanExecutor(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ const std::string& ns);
+
+ /**
+ * Public factory methods delegate to this private factory to do their work.
+ */
+ static Status make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ const std::string& ns,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out);
+
+ /**
+ * Clients of PlanExecutor expect that on receiving a new instance from one of the make()
+ * factory methods, plan selection has already been completed. In order to enforce this
+ * property, this function is called to do plan selection prior to returning the new
+ * PlanExecutor.
+ *
+ * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage,
+ * this calls into their underlying plan selection facilities. Otherwise, does nothing.
+ *
+ * If an AUTO_YIELD policy is set (and document-level locking is not supported), then
+ * locks are yielded during plan selection.
*/
- void initNs();
+ Status pickBestPlan(YieldPolicy policy);
// The OperationContext that we're executing within. We need this in order to release
// locks.
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index f8db2c1e98a..689a76973b5 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -31,61 +31,52 @@
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/concurrency/yield.h"
+#include "mongo/db/global_environment_experiment.h"
namespace mongo {
// Yield every 128 cycles or 10ms. These values were inherited from v2.6, which just copied
// them from v2.4.
- PlanYieldPolicy::PlanYieldPolicy()
+ PlanYieldPolicy::PlanYieldPolicy(PlanExecutor* exec)
: _elapsedTracker(128, 10),
- _planYielding(NULL) { }
-
- PlanYieldPolicy::~PlanYieldPolicy() {
- if (NULL != _planYielding) {
- // We were destructed mid-yield. Since we're being used to yield a runner, we have
- // to deregister the runner.
- if (_planYielding->collection()) {
- _planYielding->collection()->cursorCache()->deregisterExecutor(_planYielding);
- }
- }
- }
-
- /**
- * Yield the provided runner, registering and deregistering it appropriately. Deal with
- * deletion during a yield by setting _runnerYielding to ensure deregistration.
- *
- * Provided runner MUST be YIELD_MANUAL.
- */
- bool PlanYieldPolicy::yieldAndCheckIfOK(PlanExecutor* plan) {
- invariant(plan);
- invariant(plan->collection());
+ _planYielding(exec) { }
- // If micros > 0, we should yield.
- plan->saveState();
+ bool PlanYieldPolicy::shouldYield() {
+ return _elapsedTracker.intervalHasElapsed();
+ }
- // If we're destructed during yield this will be used to deregister ourselves.
- // This happens when we're not in a ClientCursor and somebody kills all cursors
- // on the ns we're operating on.
- _planYielding = plan;
+ bool PlanYieldPolicy::yield(bool registerPlan) {
+ // This is a no-op if document-level locking is supported. Doc-level locking systems
+ // should not need to yield.
+ if (supportsDocLocking()) {
+ return true;
+ }
- // Register with the thing that may kill() the 'plan'.
- plan->collection()->cursorCache()->registerExecutor(plan);
+ // No need to yield if the collection is NULL.
+ if (NULL == _planYielding->collection()) {
+ return true;
+ }
- // Note that this call checks for interrupt, and thus can throw if interrupt flag is set.
- Yield::yieldAllLocks(plan->getOpCtx(), 1);
+ invariant(_planYielding);
- // If the plan was killed, runner->collection() will return NULL, and we can't/don't
- // deregister it.
- if (plan->collection()) {
- plan->collection()->cursorCache()->deregisterExecutor(plan);
+ if (registerPlan) {
+ _planYielding->registerExec();
}
- _planYielding = NULL;
+ OperationContext* opCtx = _planYielding->getOpCtx();
+ invariant(opCtx);
+
+ _planYielding->saveState();
+ // Note that this call checks for interrupt, and thus can throw if interrupt flag is set.
+ Yield::yieldAllLocks(opCtx, 1);
_elapsedTracker.resetLastTime();
- return plan->restoreState(plan->getOpCtx());
+ if (registerPlan) {
+ _planYielding->deregisterExec();
+ }
+
+ return _planYielding->restoreState(opCtx);
}
} // namespace mongo
-
diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h
index cb6d1a0a9ec..50aff9656f9 100644
--- a/src/mongo/db/query/plan_yield_policy.h
+++ b/src/mongo/db/query/plan_yield_policy.h
@@ -29,26 +29,43 @@
#pragma once
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/query/plan_executor.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
class PlanYieldPolicy {
public:
- PlanYieldPolicy();
- ~PlanYieldPolicy();
+ explicit PlanYieldPolicy(PlanExecutor* exec);
+
+ /**
+ * Used by AUTO_YIELD plan executors in order to check whether it is time to yield.
+ * PlanExecutors give up their locks periodically in order to be fair to other
+ * threads.
+ */
+ bool shouldYield();
/**
- * Yield the provided runner, registering and deregistering it appropriately. Deal with
- * deletion during a yield by setting _planYielding to ensure deregistration.
+ * Used to cause a plan executor to give up locks and go to sleep. The PlanExecutor
+ * must *not* be in saved state. Handles calls to save/restore state internally.
*
- * Provided plan executor MUST be YIELD_MANUAL.
+ * By default, assumes that the PlanExecutor is already registered. If 'registerPlan'
+ * is explicitly set to true, then the executor will get automatically registered and
+ * deregistered here.
+ *
+ * Returns true if the executor was restored successfully and is still alive. Returns false
+ * if the executor got killed during yield.
*/
- bool yieldAndCheckIfOK(PlanExecutor* plan);
+ bool yield(bool registerPlan = false);
private:
+ // Default constructor disallowed in order to ensure initialization of '_planYielding'.
+ PlanYieldPolicy();
+
ElapsedTracker _elapsedTracker;
+ // The plan executor which this yield policy is responsible for yielding. Must
+ // not outlive the plan executor.
PlanExecutor* _planYielding;
};
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index 861d9dee426..0dc26fde8b2 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -223,7 +223,6 @@ namespace mongo {
n = deleteObjects( txn, ctx.ctx().db(), ns, query, false, true );
ttlDeletedDocuments.increment( n );
- ctx.commit();
}
LOG(1) << "\tTTL deleted: " << n << endl;
diff --git a/src/mongo/dbtests/clienttests.cpp b/src/mongo/dbtests/clienttests.cpp
index 01584aed69d..0b26a83e208 100644
--- a/src/mongo/dbtests/clienttests.cpp
+++ b/src/mongo/dbtests/clienttests.cpp
@@ -156,7 +156,6 @@ namespace ClientTests {
ASSERT_EQUALS(1U, db.getIndexSpecs(ns()).size());
db.ensureIndex(ns(), BSON("x" << 1), true);
- ctx.commit();
ASSERT_EQUALS(2, indexCatalog->numIndexesReady(&txn));
ASSERT_EQUALS(2U, db.getIndexSpecs(ns()).size());
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index cca1395c9dd..33b94af0ff0 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -177,7 +177,11 @@ namespace DocumentSourceTests {
CanonicalQuery* cq;
uassertStatusOK(CanonicalQuery::canonicalize(ns, /*query=*/BSONObj(), &cq));
PlanExecutor* execBare;
- uassertStatusOK(getExecutor(&_opCtx, ctx.getCollection(), cq, &execBare));
+ uassertStatusOK(getExecutor(&_opCtx,
+ ctx.getCollection(),
+ cq,
+ PlanExecutor::YIELD_MANUAL,
+ &execBare));
_exec.reset(execBare);
_exec->saveState();
diff --git a/src/mongo/dbtests/executor_registry.cpp b/src/mongo/dbtests/executor_registry.cpp
index 6ad15d20cbb..821e81ac377 100644
--- a/src/mongo/dbtests/executor_registry.cpp
+++ b/src/mongo/dbtests/executor_registry.cpp
@@ -60,12 +60,6 @@ namespace ExecutorRegistry {
}
}
- ~ExecutorRegistryBase() {
- if (_ctx.get()) {
- _ctx->commit();
- }
- }
-
/**
* Return a plan executor that is going over the collection in ns().
*/
@@ -80,17 +74,33 @@ namespace ExecutorRegistry {
// Create a plan executor to hold it
CanonicalQuery* cq;
ASSERT(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK());
- // Owns all args
- return new PlanExecutor(&_opCtx, ws.release(), scan.release(), cq,
- _ctx->ctx().db()->getCollection( &_opCtx, ns() ));
+ PlanExecutor* exec;
+ // Takes ownership of 'ws', 'scan', and 'cq'.
+ Status status = PlanExecutor::make(&_opCtx,
+ ws.release(),
+ scan.release(),
+ cq,
+ _ctx->ctx().db()->getCollection(&_opCtx, ns()),
+ PlanExecutor::YIELD_MANUAL,
+ &exec);
+ ASSERT_OK(status);
+ return exec;
}
void registerExecutor( PlanExecutor* exec ) {
- _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->registerExecutor( exec );
+ WriteUnitOfWork wuow(&_opCtx);
+ _ctx->ctx().db()->getOrCreateCollection(&_opCtx, ns())
+ ->cursorCache()
+ ->registerExecutor(exec);
+ wuow.commit();
}
void deregisterExecutor( PlanExecutor* exec ) {
- _ctx->ctx().db()->getOrCreateCollection( &_opCtx, ns() )->cursorCache()->deregisterExecutor( exec );
+ WriteUnitOfWork wuow(&_opCtx);
+ _ctx->ctx().db()->getOrCreateCollection(&_opCtx, ns())
+ ->cursorCache()
+ ->deregisterExecutor(exec);
+ wuow.commit();
}
int N() { return 50; }
@@ -278,7 +288,6 @@ namespace ExecutorRegistry {
// Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB
// requires a "global write lock."
- _ctx->commit();
_ctx.reset();
_client.dropDatabase("somesillydb");
_ctx.reset(new Client::WriteContext(&_opCtx, ns()));
@@ -295,7 +304,6 @@ namespace ExecutorRegistry {
registerExecutor(run.get());
// Drop our DB. Once again, must give up the lock.
- _ctx->commit();
_ctx.reset();
_client.dropDatabase("unittests");
_ctx.reset(new Client::WriteContext(&_opCtx, ns()));
@@ -303,7 +311,6 @@ namespace ExecutorRegistry {
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState(&_opCtx);
- _ctx->commit();
_ctx.reset();
// PlanExecutor was killed.
diff --git a/src/mongo/dbtests/indexcatalogtests.cpp b/src/mongo/dbtests/indexcatalogtests.cpp
index a18488c542a..5d27c3d9200 100644
--- a/src/mongo/dbtests/indexcatalogtests.cpp
+++ b/src/mongo/dbtests/indexcatalogtests.cpp
@@ -34,24 +34,27 @@ namespace IndexCatalogTests {
IndexIteratorTests() {
OperationContextImpl txn;
Client::WriteContext ctx(&txn, _ns);
+ WriteUnitOfWork wuow(&txn);
_db = ctx.db();
_coll = _db->createCollection(&txn, _ns);
_catalog = _coll->getIndexCatalog();
- ctx.commit();
+ wuow.commit();
}
~IndexIteratorTests() {
OperationContextImpl txn;
Client::WriteContext ctx(&txn, _ns);
+ WriteUnitOfWork wuow(&txn);
_db->dropCollection(&txn, _ns);
- ctx.commit();
+ wuow.commit();
}
void run() {
OperationContextImpl txn;
Client::WriteContext ctx(&txn, _ns);
+ WriteUnitOfWork wuow(&txn);
int numFinishedIndexesStart = _catalog->numIndexesReady(&txn);
@@ -77,7 +80,7 @@ namespace IndexCatalogTests {
}
}
- ctx.commit();
+ wuow.commit();
ASSERT_TRUE(indexesIterated == _catalog->numIndexesReady(&txn));
ASSERT_TRUE(foundIndex);
}
diff --git a/src/mongo/dbtests/indexupdatetests.cpp b/src/mongo/dbtests/indexupdatetests.cpp
index 28d1d707289..fe8200fee38 100644
--- a/src/mongo/dbtests/indexupdatetests.cpp
+++ b/src/mongo/dbtests/indexupdatetests.cpp
@@ -55,13 +55,14 @@ namespace IndexUpdateTests {
public:
IndexBuildBase() :
_ctx(&_txn, _ns),
+ _wunit(&_txn),
_client(&_txn) {
_client.createCollection( _ns );
}
~IndexBuildBase() {
_client.dropCollection( _ns );
- _ctx.commit(); // just for testing purposes
+ _wunit.commit(); // just for testing purposes
getGlobalEnvironment()->unsetKillAllOperations();
}
Collection* collection() {
@@ -117,6 +118,7 @@ namespace IndexUpdateTests {
OperationContextImpl _txn;
Client::WriteContext _ctx;
+ WriteUnitOfWork _wunit;
DBDirectClient _client;
};
diff --git a/src/mongo/dbtests/plan_ranking.cpp b/src/mongo/dbtests/plan_ranking.cpp
index 6018e7f2616..41ec196a6fe 100644
--- a/src/mongo/dbtests/plan_ranking.cpp
+++ b/src/mongo/dbtests/plan_ranking.cpp
@@ -72,7 +72,6 @@ namespace PlanRankingTests {
Client::WriteContext ctx(&_txn, ns);
_client.dropCollection(ns);
- ctx.commit();
}
virtual ~PlanRankingTestBase() {
@@ -84,13 +83,11 @@ namespace PlanRankingTests {
void insert(const BSONObj& obj) {
Client::WriteContext ctx(&_txn, ns);
_client.insert(ns, obj);
- ctx.commit();
}
void addIndex(const BSONObj& obj) {
Client::WriteContext ctx(&_txn, ns);
_client.ensureIndex(ns, obj);
- ctx.commit();
}
/**
@@ -125,8 +122,9 @@ namespace PlanRankingTests {
// Takes ownership of all (actually some) arguments.
_mps->addPlan(solutions[i], root, ws.get());
}
-
- _mps->pickBestPlan(); // This is what sets a backup plan, should we test for it.
+ // This is what sets a backup plan, should we test for it. NULL means that there
+ // is no yield policy for this MultiPlanStage's plan selection.
+ _mps->pickBestPlan(NULL);
ASSERT(_mps->bestPlanChosen());
size_t bestPlanIdx = _mps->bestPlanIdx();
diff --git a/src/mongo/dbtests/query_multi_plan_runner.cpp b/src/mongo/dbtests/query_multi_plan_runner.cpp
index 8e622df8507..7da7cb7c28c 100644
--- a/src/mongo/dbtests/query_multi_plan_runner.cpp
+++ b/src/mongo/dbtests/query_multi_plan_runner.cpp
@@ -72,31 +72,26 @@ namespace QueryMultiPlanRunner {
MultiPlanRunnerBase() : _client(&_txn) {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
virtual ~MultiPlanRunnerBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void addIndex(const BSONObj& obj) {
Client::WriteContext ctx(&_txn, ns());
_client.ensureIndex(ns(), obj);
- ctx.commit();
}
void insert(const BSONObj& obj) {
Client::WriteContext ctx(&_txn, ns());
_client.insert(ns(), obj);
- ctx.commit();
}
void remove(const BSONObj& obj) {
Client::WriteContext ctx(&_txn, ns());
_client.remove(ns(), obj);
- ctx.commit();
}
static const char* ns() { return "unittests.QueryStageMultiPlanRunner"; }
@@ -160,18 +155,23 @@ namespace QueryMultiPlanRunner {
mps->addPlan(createQuerySolution(), firstRoot.release(), sharedWs.get());
mps->addPlan(createQuerySolution(), secondRoot.release(), sharedWs.get());
- // Plan 0 aka the first plan aka the index scan should be the best.
- mps->pickBestPlan();
+ // Plan 0 aka the first plan aka the index scan should be the best. NULL means that
+ // 'mps' will not yield during plan selection.
+ mps->pickBestPlan(NULL);
ASSERT(mps->bestPlanChosen());
ASSERT_EQUALS(0, mps->bestPlanIdx());
// Takes ownership of arguments other than 'collection'.
- PlanExecutor exec(&_txn, sharedWs.release(), mps, cq, coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, sharedWs.release(), mps, cq, coll,
+ PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
// Get all our results out.
int results = 0;
BSONObj obj;
- while (PlanExecutor::ADVANCED == exec.getNext(&obj, NULL)) {
+ while (PlanExecutor::ADVANCED == exec->getNext(&obj, NULL)) {
ASSERT_EQUALS(obj["foo"].numberInt(), 7);
++results;
}
@@ -235,8 +235,8 @@ namespace QueryMultiPlanRunner {
mps->addPlan(solutions[i], root, ws.get());
}
- // This sets a backup plan.
- mps->pickBestPlan();
+ // This sets a backup plan. NULL means that 'mps' will not yield.
+ mps->pickBestPlan(NULL);
ASSERT(mps->bestPlanChosen());
ASSERT(mps->hasBackupPlan());
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 693c8e3a72c..1fa7212831d 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -97,8 +97,11 @@ namespace QueryPlanExecutor {
// Make the stage.
auto_ptr<PlanStage> root(new CollectionScan(&_txn, csparams, ws.get(), cq->root()));
+ PlanExecutor* exec;
// Hand the plan off to the executor.
- PlanExecutor* exec = new PlanExecutor(&_txn, ws.release(), root.release(), cq, coll);
+ Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll,
+ PlanExecutor::YIELD_MANUAL, &exec);
+ ASSERT_OK(stat);
return exec;
}
@@ -136,8 +139,12 @@ namespace QueryPlanExecutor {
verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK());
verify(NULL != cq);
+ PlanExecutor* exec;
// Hand the plan off to the executor.
- return new PlanExecutor(&_txn, ws.release(), root.release(), cq, coll);
+ Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll,
+ PlanExecutor::YIELD_MANUAL, &exec);
+ ASSERT_OK(stat);
+ return exec;
}
static const char* ns() { return "unittests.QueryPlanExecutor"; }
@@ -153,15 +160,19 @@ namespace QueryPlanExecutor {
void registerExec( PlanExecutor* exec ) {
// TODO: This is not correct (create collection under S-lock)
AutoGetCollectionForRead ctx(&_txn, ns());
+ WriteUnitOfWork wunit(&_txn);
Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns());
collection->cursorCache()->registerExecutor( exec );
+ wunit.commit();
}
void deregisterExec( PlanExecutor* exec ) {
// TODO: This is not correct (create collection under S-lock)
AutoGetCollectionForRead ctx(&_txn, ns());
+ WriteUnitOfWork wunit(&_txn);
Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns());
collection->cursorCache()->deregisterExecutor( exec );
+ wunit.commit();
}
protected:
@@ -203,7 +214,6 @@ namespace QueryPlanExecutor {
ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
deregisterExec(exec.get());
- ctx.commit();
}
};
@@ -233,7 +243,6 @@ namespace QueryPlanExecutor {
ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
deregisterExec(exec.get());
- ctx.commit();
}
};
@@ -270,8 +279,12 @@ namespace QueryPlanExecutor {
std::auto_ptr<PipelineProxyStage> proxy(
new PipelineProxyStage(pipeline, innerExec, ws.get()));
Collection* collection = ctx.getCollection();
- boost::scoped_ptr<PlanExecutor> outerExec(
- new PlanExecutor(&_txn, ws.release(), proxy.release(), collection));
+
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws.release(), proxy.release(), collection,
+ PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> outerExec(rawExec);
// Only the outer executor gets registered.
registerExec(outerExec.get());
@@ -284,7 +297,6 @@ namespace QueryPlanExecutor {
ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL));
deregisterExec(outerExec.get());
- ctx.commit();
}
};
@@ -352,7 +364,6 @@ namespace QueryPlanExecutor {
int ids[] = {3, 4, 2};
checkIds(ids, exec.get());
- ctx.commit();
}
};
@@ -382,7 +393,6 @@ namespace QueryPlanExecutor {
// we should not see the moved document again.
int ids[] = {3, 4};
checkIds(ids, exec.get());
- ctx.commit();
}
};
@@ -412,7 +422,6 @@ namespace QueryPlanExecutor {
ASSERT_EQUALS(1U, numCursors());
coll->cursorCache()->invalidateAll(false);
ASSERT_EQUALS(0U, numCursors());
- ctx.commit();
}
};
@@ -449,7 +458,6 @@ namespace QueryPlanExecutor {
// number of cursors to return to 0.
ccPin.deleteUnderlying();
ASSERT_EQUALS(0U, numCursors());
- ctx.commit();
}
};
@@ -463,7 +471,6 @@ namespace QueryPlanExecutor {
{
Client::WriteContext ctx(&_txn, ns());
insert(BSON("a" << 1 << "b" << 1));
- ctx.commit();
}
{
diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp
index c2e3a3630fb..645aba10cd2 100644
--- a/src/mongo/dbtests/query_stage_and.cpp
+++ b/src/mongo/dbtests/query_stage_and.cpp
@@ -160,7 +160,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -249,7 +251,6 @@ namespace QueryStageAnd {
ASSERT_GREATER_THAN_OR_EQUALS(elt.numberInt(), 10);
}
- ctx.commit();
ASSERT_EQUALS(10, count);
}
};
@@ -262,7 +263,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -335,7 +338,6 @@ namespace QueryStageAnd {
++count;
}
- ctx.commit();
ASSERT_EQUALS(count, 20);
}
};
@@ -348,7 +350,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -378,7 +382,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// foo == bar == baz, and foo<=20, bar>=10, so our values are:
// foo == 10, 11, 12, 13, 14, 15. 16, 17, 18, 19, 20
@@ -397,7 +400,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Generate large keys for {foo: 1, big: 1} index.
@@ -432,7 +437,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// Stage execution should fail.
ASSERT_EQUALS(-1, countResults(ah.get()));
@@ -449,7 +453,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Generate large keys for {baz: 1, big: 1} index.
@@ -484,7 +490,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// foo == bar == baz, and foo<=20, bar>=10, so our values are:
// foo == 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20.
@@ -500,7 +505,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -539,7 +546,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// foo == bar == baz, and foo<=20, bar>=10, 5<=baz<=15, so our values are:
// foo == 10, 11, 12, 13, 14, 15.
@@ -561,7 +567,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Generate large keys for {bar: 1, big: 1} index.
@@ -605,7 +613,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// Stage execution should fail.
ASSERT_EQUALS(-1, countResults(ah.get()));
@@ -620,7 +627,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -660,7 +669,6 @@ namespace QueryStageAnd {
if (PlanStage::ADVANCED != status) { continue; }
++count;
}
- ctx.commit();
ASSERT_EQUALS(0, count);
@@ -679,7 +687,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 10; ++i) {
@@ -713,7 +723,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = false;
params.direction = -1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
ASSERT_EQUALS(0, countResults(ah.get()));
}
@@ -727,7 +736,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -761,7 +772,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// Bar == 97
ASSERT_EQUALS(1, countResults(ah.get()));
@@ -779,7 +789,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -814,7 +826,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// Check that the AndHash stage returns docs {foo: 10, bar: 10}
// through {foo: 20, bar: 20}.
@@ -837,7 +848,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -872,7 +885,6 @@ namespace QueryStageAnd {
// constructor means there is no filter.
FetchStage* fetch = new FetchStage(&_txn, &ws, secondScan, NULL, coll);
ah->addChild(fetch);
- ctx.commit();
// Check that the AndHash stage returns docs {foo: 10, bar: 10}
// through {foo: 20, bar: 20}.
@@ -900,7 +912,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Insert a bunch of data
@@ -1000,7 +1014,6 @@ namespace QueryStageAnd {
ASSERT_TRUE(member->getFieldDotted("bar", &elt));
ASSERT_EQUALS(1, elt.numberInt());
}
- ctx.commit();
ASSERT_EQUALS(count, 48);
@@ -1017,7 +1030,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Insert a bunch of data
@@ -1056,7 +1071,6 @@ namespace QueryStageAnd {
// baz == 1
params.descriptor = getIndex(BSON("baz" << 1), coll);
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
ASSERT_EQUALS(50, countResults(ah.get()));
}
@@ -1070,10 +1084,11 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
-
for (int i = 0; i < 50; ++i) {
insert(BSON("foo" << 8 << "bar" << 20));
}
@@ -1101,7 +1116,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
ASSERT_EQUALS(0, countResults(ah.get()));
}
@@ -1115,7 +1129,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -1149,7 +1165,6 @@ namespace QueryStageAnd {
params.bounds.endKeyInclusive = true;
params.direction = 1;
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
ASSERT_EQUALS(0, countResults(ah.get()));
}
@@ -1163,7 +1178,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -1193,7 +1210,6 @@ namespace QueryStageAnd {
// bar == 1
params.descriptor = getIndex(BSON("bar" << 1), coll);
ah->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
// Filter drops everything.
ASSERT_EQUALS(0, countResults(ah.get()));
@@ -1208,7 +1224,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
for (int i = 0; i < 50; ++i) {
@@ -1253,7 +1271,6 @@ namespace QueryStageAnd {
}
lastId = id;
}
- ctx.commit();
ASSERT_EQUALS(count, 43);
}
@@ -1270,7 +1287,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Insert a bunch of data
@@ -1302,7 +1321,6 @@ namespace QueryStageAnd {
// bar == 1
params.descriptor = getIndex(BSON("bar" << 1), coll);
as->addChild(new IndexScan(&_txn, params, &ws, NULL));
- ctx.commit();
for (int i = 0; i < 50; i++) {
BSONObj obj = getNext(as.get(), &ws);
@@ -1323,7 +1341,9 @@ namespace QueryStageAnd {
Database* db = ctx.db();
Collection* coll = ctx.getCollection();
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
// Insert a bunch of data
@@ -1355,7 +1375,6 @@ namespace QueryStageAnd {
// constructor means there is no filter.
FetchStage* fetch = new FetchStage(&_txn, &ws, secondScan, NULL, coll);
as->addChild(fetch);
- ctx.commit();
for (int i = 0; i < 50; i++) {
BSONObj obj = getNext(as.get(), &ws);
diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp
index 9cbf2dc22f7..db759de784f 100644
--- a/src/mongo/dbtests/query_stage_collscan.cpp
+++ b/src/mongo/dbtests/query_stage_collscan.cpp
@@ -60,13 +60,11 @@ namespace QueryStageCollectionScan {
bob.append("foo", i);
_client.insert(ns(), bob.obj());
}
- ctx.commit();
}
virtual ~QueryStageCollectionScanBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void remove(const BSONObj& obj) {
@@ -90,11 +88,16 @@ namespace QueryStageCollectionScan {
// Make a scan and have the runner own it.
WorkingSet* ws = new WorkingSet();
PlanStage* ps = new CollectionScan(&_txn, params, ws, filterExpr.get());
- PlanExecutor runner(&_txn, ws, ps, params.collection);
+
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, ps, params.collection,
+ PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
// Use the runner to count the number of objects scanned.
int count = 0;
- for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) { ++count; }
+ for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) { ++count; }
return count;
}
@@ -195,10 +198,15 @@ namespace QueryStageCollectionScan {
// Make a scan and have the runner own it.
WorkingSet* ws = new WorkingSet();
PlanStage* ps = new CollectionScan(&_txn, params, ws, NULL);
- PlanExecutor runner(&_txn, ws, ps, params.collection);
+
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, ps, params.collection,
+ PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
int count = 0;
- for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) {
+ for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) {
// Make sure we get the objects in the order we want
ASSERT_EQUALS(count, obj["foo"].numberInt());
++count;
@@ -224,10 +232,15 @@ namespace QueryStageCollectionScan {
WorkingSet* ws = new WorkingSet();
PlanStage* ps = new CollectionScan(&_txn, params, ws, NULL);
- PlanExecutor runner(&_txn, ws, ps, params.collection);
+
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, ps, params.collection,
+ PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
int count = 0;
- for (BSONObj obj; PlanExecutor::ADVANCED == runner.getNext(&obj, NULL); ) {
+ for (BSONObj obj; PlanExecutor::ADVANCED == exec->getNext(&obj, NULL); ) {
++count;
ASSERT_EQUALS(numObj() - count, obj["foo"].numberInt());
}
@@ -293,7 +306,6 @@ namespace QueryStageCollectionScan {
++count;
}
}
- ctx.commit();
ASSERT_EQUALS(numObj(), count);
}
@@ -355,7 +367,6 @@ namespace QueryStageCollectionScan {
++count;
}
}
- ctx.commit();
ASSERT_EQUALS(numObj(), count);
}
diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp
index 8e18509342e..f0d64edaa1c 100644
--- a/src/mongo/dbtests/query_stage_count.cpp
+++ b/src/mongo/dbtests/query_stage_count.cpp
@@ -55,7 +55,6 @@ namespace QueryStageCount {
virtual ~CountBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void addIndex(const BSONObj& obj) {
@@ -117,7 +116,6 @@ namespace QueryStageCount {
// Add an index on a:1
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up the count stage
CountScanParams params;
@@ -151,7 +149,6 @@ namespace QueryStageCount {
// Add an index
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up the count stage
CountScanParams params;
@@ -184,7 +181,6 @@ namespace QueryStageCount {
// Add an index
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up the count stage
CountScanParams params;
@@ -213,7 +209,6 @@ namespace QueryStageCount {
// Insert doc, add index
insert(BSON("a" << 2));
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count, and run
CountScanParams params;
@@ -243,7 +238,6 @@ namespace QueryStageCount {
insert(BSON("a" << 2));
insert(BSON("a" << 3));
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count, and run
CountScanParams params;
@@ -274,7 +268,6 @@ namespace QueryStageCount {
insert(BSON("a" << 2));
insert(BSON("a" << 4));
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count, and run
CountScanParams params;
@@ -306,7 +299,6 @@ namespace QueryStageCount {
insert(BSON("a" << i));
}
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count stage
CountScanParams params;
@@ -358,7 +350,6 @@ namespace QueryStageCount {
insert(BSON("a" << i));
}
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count stage
CountScanParams params;
@@ -386,7 +377,6 @@ namespace QueryStageCount {
// Remove remaining objects
remove(BSON("a" << GTE << 5));
- ctx.commit();
// Recover from yield
count.restoreState(&_txn);
@@ -414,7 +404,6 @@ namespace QueryStageCount {
insert(BSON("a" << i));
}
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count stage
CountScanParams params;
@@ -445,7 +434,6 @@ namespace QueryStageCount {
// Insert one document after the end
insert(BSON("a" << 6.5));
- ctx.commit();
// Recover from yield
count.restoreState(&_txn);
@@ -473,7 +461,6 @@ namespace QueryStageCount {
insert(BSON("a" << i));
}
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count stage
CountScanParams params;
@@ -501,7 +488,6 @@ namespace QueryStageCount {
// Insert a document with two values for 'a'
insert(BSON("a" << BSON_ARRAY(10 << 11)));
- ctx.commit();
// Recover from yield
count.restoreState(&_txn);
@@ -533,7 +519,6 @@ namespace QueryStageCount {
remove(BSON("a" << 1 << "b" << 0));
remove(BSON("a" << 1 << "b" << 3));
remove(BSON("a" << 1 << "b" << 4));
- ctx.commit();
// Ensure that count does not include unused keys
CountScanParams params;
@@ -567,7 +552,6 @@ namespace QueryStageCount {
// Mark key at end position as 'unused' by deleting
remove(BSON("a" << 1 << "b" << 9));
- ctx.commit();
// Run count and check
CountScanParams params;
@@ -598,7 +582,6 @@ namespace QueryStageCount {
insert(BSON("a" << 1 << "b" << i));
}
addIndex(BSON("a" << 1));
- ctx.commit();
// Set up count stage
CountScanParams params;
@@ -626,7 +609,6 @@ namespace QueryStageCount {
// Mark the key at position 5 as 'unused'
remove(BSON("a" << 1 << "b" << 5));
- ctx.commit();
// Recover from yield
count.restoreState(&_txn);
diff --git a/src/mongo/dbtests/query_stage_delete.cpp b/src/mongo/dbtests/query_stage_delete.cpp
index 19ce4fe37cd..2c905e184cb 100644
--- a/src/mongo/dbtests/query_stage_delete.cpp
+++ b/src/mongo/dbtests/query_stage_delete.cpp
@@ -54,13 +54,11 @@ namespace QueryStageDelete {
bob.append("foo", static_cast<long long int>(i));
_client.insert(ns(), bob.obj());
}
- ctx.commit();
}
virtual ~QueryStageDeleteBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void remove(const BSONObj& obj) {
@@ -158,8 +156,6 @@ namespace QueryStageDelete {
}
ASSERT_EQUALS(numObj() - 1, stats->docsDeleted);
-
- ctx.commit();
}
};
diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp
index cd06b3e6fdb..713103d74f1 100644
--- a/src/mongo/dbtests/query_stage_fetch.cpp
+++ b/src/mongo/dbtests/query_stage_fetch.cpp
@@ -88,12 +88,14 @@ namespace QueryStageFetch {
public:
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
+
WorkingSet ws;
// Add an object to the DB.
@@ -101,7 +103,6 @@ namespace QueryStageFetch {
set<DiskLoc> locs;
getLocs(&locs, coll);
ASSERT_EQUALS(size_t(1), locs.size());
- ctx.commit();
// Create a mock stage that returns the WSM.
auto_ptr<MockStage> mockStage(new MockStage(&ws));
@@ -147,12 +148,14 @@ namespace QueryStageFetch {
public:
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
+
WorkingSet ws;
// Add an object to the DB.
@@ -197,7 +200,6 @@ namespace QueryStageFetch {
// No more data to fetch, so, EOF.
state = fetchStage->work(&id);
ASSERT_EQUALS(PlanStage::IS_EOF, state);
- ctx.commit();
}
};
diff --git a/src/mongo/dbtests/query_stage_keep.cpp b/src/mongo/dbtests/query_stage_keep.cpp
index 2bb6373d865..91c8fb639ba 100644
--- a/src/mongo/dbtests/query_stage_keep.cpp
+++ b/src/mongo/dbtests/query_stage_keep.cpp
@@ -105,12 +105,14 @@ namespace QueryStageKeep {
public:
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
+
WorkingSet ws;
// Add 10 objects to the collection.
@@ -126,7 +128,6 @@ namespace QueryStageKeep {
member->obj = BSON("x" << 2);
ws.flagForReview(id);
}
- ctx.commit();
// Create a collscan to provide the 10 objects in the collection.
CollectionScanParams params;
diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp
index ab4c027aaf7..a318da31114 100644
--- a/src/mongo/dbtests/query_stage_merge_sort.cpp
+++ b/src/mongo/dbtests/query_stage_merge_sort.cpp
@@ -54,7 +54,6 @@ namespace QueryStageMergeSortTests {
virtual ~QueryStageMergeSortTestBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void addIndex(const BSONObj& obj) {
@@ -114,7 +113,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
const int N = 50;
@@ -149,15 +150,18 @@ namespace QueryStageMergeSortTests {
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws, NULL));
- ctx.commit();
// Must fetch if we want to easily pull out an obj.
- PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
for (int i = 0; i < N; ++i) {
BSONObj first, second;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL));
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b"))
@@ -166,7 +170,7 @@ namespace QueryStageMergeSortTests {
// Should be done now.
BSONObj foo;
- ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&foo, NULL));
+ ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&foo, NULL));
}
};
@@ -178,7 +182,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
const int N = 50;
@@ -213,14 +219,17 @@ namespace QueryStageMergeSortTests {
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws, NULL));
- ctx.commit();
- PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
for (int i = 0; i < N; ++i) {
BSONObj first, second;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL));
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b"))
@@ -229,7 +238,7 @@ namespace QueryStageMergeSortTests {
// Should be done now.
BSONObj foo;
- ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL));
+ ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
@@ -241,7 +250,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
const int N = 50;
@@ -276,15 +287,18 @@ namespace QueryStageMergeSortTests {
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws, NULL));
- ctx.commit();
- PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
for (int i = 0; i < N; ++i) {
BSONObj first, second;
// We inserted N objects but we get 2 * N from the runner because of dups.
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL));
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b"))
@@ -293,7 +307,7 @@ namespace QueryStageMergeSortTests {
// Should be done now.
BSONObj foo;
- ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL));
+ ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
@@ -305,7 +319,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
const int N = 50;
@@ -342,14 +358,17 @@ namespace QueryStageMergeSortTests {
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws, NULL));
- ctx.commit();
- PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
for (int i = 0; i < N; ++i) {
BSONObj first, second;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&first, NULL));
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&second, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(N - i - 1, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b"))
@@ -358,7 +377,7 @@ namespace QueryStageMergeSortTests {
// Should be done now.
BSONObj foo;
- ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL));
+ ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
@@ -370,7 +389,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
const int N = 50;
@@ -407,21 +428,24 @@ namespace QueryStageMergeSortTests {
params.bounds.startKey = BSON("" << 51 << "" << MinKey);
params.bounds.endKey = BSON("" << 51 << "" << MaxKey);
ms->addChild(new IndexScan(&_txn, params, ws, NULL));
- ctx.commit();
- PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
// Only getting results from the a:1 index scan.
for (int i = 0; i < N; ++i) {
BSONObj obj;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&obj, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["c"].numberInt());
ASSERT_EQUALS(1, obj["a"].numberInt());
}
// Should be done now.
BSONObj foo;
- ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL));
+ ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
@@ -433,7 +457,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
WorkingSet* ws = new WorkingSet();
@@ -460,13 +486,16 @@ namespace QueryStageMergeSortTests {
params.descriptor = getIndex(indexSpec, coll);
ms->addChild(new IndexScan(&_txn, params, ws, NULL));
}
- ctx.commit();
- PlanExecutor runner(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll);
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
for (int i = 0; i < numIndices; ++i) {
BSONObj obj;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&obj, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
string index(1, 'a' + i);
ASSERT_EQUALS(1, obj[index].numberInt());
@@ -474,7 +503,7 @@ namespace QueryStageMergeSortTests {
// Should be done now.
BSONObj foo;
- ASSERT_EQUALS(PlanExecutor::IS_EOF, runner.getNext(&foo, NULL));
+ ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
@@ -486,7 +515,9 @@ namespace QueryStageMergeSortTests {
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
WorkingSet ws;
@@ -520,7 +551,6 @@ namespace QueryStageMergeSortTests {
getLocs(&locs, coll);
set<DiskLoc>::iterator it = locs.begin();
- ctx.commit();
// Get 10 results. Should be getting results in order of 'locs'.
int count = 0;
diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp
index 20834444537..0092f6ccec6 100644
--- a/src/mongo/dbtests/query_stage_sort.cpp
+++ b/src/mongo/dbtests/query_stage_sort.cpp
@@ -122,22 +122,26 @@ namespace QueryStageSortTests {
params.limit = limit();
// Must fetch so we can look at the doc as a BSONObj.
- PlanExecutor runner(&_txn,
- ws,
- new FetchStage(&_txn, ws,
- new SortStage(&_txn, params, ws, ms), NULL, coll),
- coll);
+ PlanExecutor* rawExec;
+ Status status =
+ PlanExecutor::make(&_txn,
+ ws,
+ new FetchStage(&_txn, ws,
+ new SortStage(&_txn, params, ws, ms), NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
// Look at pairs of objects to make sure that the sort order is pairwise (and therefore
// totally) correct.
BSONObj last;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, runner.getNext(&last, NULL));
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&last, NULL));
// Count 'last'.
int count = 1;
BSONObj current;
- while (PlanExecutor::ADVANCED == runner.getNext(&current, NULL)) {
+ while (PlanExecutor::ADVANCED == exec->getNext(&current, NULL)) {
int cmp = sgn(current.woSortOrder(last, params.pattern));
// The next object should be equal to the previous or oriented according to the sort
// pattern.
@@ -185,16 +189,16 @@ namespace QueryStageSortTests {
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
fillData();
sortAndCheck(1, coll);
- ctx.commit();
}
};
@@ -205,16 +209,16 @@ namespace QueryStageSortTests {
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
fillData();
sortAndCheck(-1, coll);
- ctx.commit();
}
};
@@ -234,16 +238,16 @@ namespace QueryStageSortTests {
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
fillData();
sortAndCheck(-1, coll);
- ctx.commit();
}
};
@@ -254,12 +258,14 @@ namespace QueryStageSortTests {
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
+
fillData();
// The data we're going to later invalidate.
@@ -319,7 +325,6 @@ namespace QueryStageSortTests {
ASSERT(!member->hasLoc());
++count;
}
- ctx.commit();
// Returns all docs.
ASSERT_EQUALS(limit() ? limit() : numObj(), count);
@@ -345,11 +350,12 @@ namespace QueryStageSortTests {
void run() {
Client::WriteContext ctx(&_txn, ns());
-
Database* db = ctx.ctx().db();
Collection* coll = db->getCollection(&_txn, ns());
if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
+ wuow.commit();
}
WorkingSet* ws = new WorkingSet();
@@ -372,15 +378,18 @@ namespace QueryStageSortTests {
params.limit = 0;
// We don't get results back since we're sorting some parallel arrays.
- PlanExecutor runner(&_txn,
- ws,
- new FetchStage(&_txn,
- ws,
- new SortStage(&_txn, params, ws, ms), NULL, coll),
- coll);
- PlanExecutor::ExecState runnerState = runner.getNext(NULL, NULL);
+ PlanExecutor* rawExec;
+ Status status =
+ PlanExecutor::make(&_txn,
+ ws,
+ new FetchStage(&_txn,
+ ws,
+ new SortStage(&_txn, params, ws, ms), NULL, coll),
+ coll, PlanExecutor::YIELD_MANUAL, &rawExec);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
+
+ PlanExecutor::ExecState runnerState = exec->getNext(NULL, NULL);
ASSERT_EQUALS(PlanExecutor::EXEC_ERROR, runnerState);
- ctx.commit();
}
};
diff --git a/src/mongo/dbtests/query_stage_subplan.cpp b/src/mongo/dbtests/query_stage_subplan.cpp
index d5f1c5e5801..e431fcb68db 100644
--- a/src/mongo/dbtests/query_stage_subplan.cpp
+++ b/src/mongo/dbtests/query_stage_subplan.cpp
@@ -47,7 +47,6 @@ namespace QueryStageSubplan {
virtual ~QueryStageSubplanBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void addIndex(const BSONObj& obj) {
@@ -93,12 +92,13 @@ namespace QueryStageSubplan {
QueryPlannerParams plannerParams;
fillOutPlannerParams(&_txn, collection, cq, &plannerParams);
- // We expect creation of the subplan stage to fail.
WorkingSet ws;
- SubplanStage* subplan;
- ASSERT_NOT_OK(SubplanStage::make(&_txn, collection, &ws, plannerParams, cq, &subplan));
+ boost::scoped_ptr<SubplanStage> subplan(new SubplanStage(&_txn, collection, &ws,
+ plannerParams, cq));
- ctx.commit();
+ // NULL means that 'subplan' will not yield during plan selection. Plan selection
+ // should succeed due to falling back on regular planning.
+ ASSERT_OK(subplan->pickBestPlan(NULL));
}
};
diff --git a/src/mongo/dbtests/query_stage_tests.cpp b/src/mongo/dbtests/query_stage_tests.cpp
index ea59b791c5a..c0b203ef0b9 100644
--- a/src/mongo/dbtests/query_stage_tests.cpp
+++ b/src/mongo/dbtests/query_stage_tests.cpp
@@ -59,19 +59,16 @@ namespace QueryStageTests {
addIndex(BSON("foo" << 1));
addIndex(BSON("foo" << 1 << "baz" << 1));
- ctx.commit();
}
virtual ~IndexScanBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void addIndex(const BSONObj& obj) {
Client::WriteContext ctx(&_txn, ns());
_client.ensureIndex(ns(), obj);
- ctx.commit();
}
int countResults(const IndexScanParams& params, BSONObj filterObj = BSONObj()) {
@@ -82,13 +79,19 @@ namespace QueryStageTests {
auto_ptr<MatchExpression> filterExpr(swme.getValue());
WorkingSet* ws = new WorkingSet();
- PlanExecutor runner(&_txn,
- ws,
- new IndexScan(&_txn, params, ws, filterExpr.get()),
- ctx.getCollection());
+
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(&_txn,
+ ws,
+ new IndexScan(&_txn, params, ws, filterExpr.get()),
+ ctx.getCollection(),
+ PlanExecutor::YIELD_MANUAL,
+ &rawExec);
+ ASSERT_OK(status);
+ boost::scoped_ptr<PlanExecutor> exec(rawExec);
int count = 0;
- for (DiskLoc dl; PlanExecutor::ADVANCED == runner.getNext(NULL, &dl); ) {
+ for (DiskLoc dl; PlanExecutor::ADVANCED == exec->getNext(NULL, &dl); ) {
++count;
}
@@ -103,7 +106,6 @@ namespace QueryStageTests {
double lng = double(rand()) / RAND_MAX;
_client.insert(ns(), BSON("geo" << BSON_ARRAY(lng << lat)));
}
- ctx.commit();
}
IndexDescriptor* getIndex(const BSONObj& obj) {
diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp
index 2ca29438a93..7be23ea50d5 100644
--- a/src/mongo/dbtests/query_stage_update.cpp
+++ b/src/mongo/dbtests/query_stage_update.cpp
@@ -57,13 +57,11 @@ namespace QueryStageUpdate {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
_client.createCollection(ns());
- ctx.commit();
}
virtual ~QueryStageUpdateBase() {
Client::WriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
- ctx.commit();
}
void insert(const BSONObj& doc) {
@@ -218,7 +216,6 @@ namespace QueryStageUpdate {
new UpdateStage(params, ws.get(), db, eofStage.release()));
runUpdate(updateStage.get());
- ctx.commit();
}
// Verify the contents of the resulting collection.
@@ -323,8 +320,6 @@ namespace QueryStageUpdate {
ASSERT(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state);
}
- ctx.commit();
-
// 4 of the 5 matching documents should have been modified (one was deleted).
ASSERT_EQUALS(4U, stats->nModified);
ASSERT_EQUALS(4U, stats->nMatched);
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index 4467de0a3d8..57b9632e94e 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -261,7 +261,6 @@ namespace QueryTests {
// ASSERT( clientCursor.c()->pq );
// ASSERT_EQUALS( 2, clientCursor.c()->pq->getNumToReturn() );
ASSERT_EQUALS( 2, clientCursor.c()->pos() );
- ctx.commit();
}
cursor = _client.getMore( ns, cursorId );
@@ -1206,8 +1205,13 @@ namespace QueryTests {
// note that extents are always at least 4KB now - so this will get rounded up
// a bit.
- ASSERT( userCreateNS(&_txn, ctx.db(), ns(),
- fromjson( "{ capped : true, size : 2000 }" ), false ).isOK() );
+ {
+ WriteUnitOfWork wunit(&_txn);
+ ASSERT( userCreateNS(&_txn, ctx.db(), ns(),
+ fromjson( "{ capped : true, size : 2000 }" ), false ).isOK() );
+ wunit.commit();
+ }
+
for (int i = 0; i < 200; i++) {
insertNext();
ASSERT(count() < 90);
@@ -1230,7 +1234,6 @@ namespace QueryTests {
for ( int i=0; i<90; i++ ) {
insertNext();
}
- ctx.commit();
while ( c->more() ) { c->next(); }
}
@@ -1257,7 +1260,6 @@ namespace QueryTests {
for ( int i=0; i<50; i++ ) {
insert( ns() , BSON( "_id" << i << "x" << i * 2 ) );
}
- ctx.commit();
ASSERT_EQUALS( 50 , count() );
@@ -1312,7 +1314,6 @@ namespace QueryTests {
for ( int i=0; i<1000; i+=2 ) {
_client.remove( ns() , BSON( "_id" << i ) );
}
- ctx.commit();
BSONObj res;
for ( int i=0; i<1000; i++ ) {
@@ -1333,7 +1334,6 @@ namespace QueryTests {
for ( int i=0; i<1000; i++ ) {
insert( ns() , BSON( "_id" << i << "x" << i * 2 ) );
}
- ctx.commit();
}
};
@@ -1532,7 +1532,6 @@ namespace QueryTests {
str::stream() << "Cannot kill active cursor " << cursorId;
ASSERT_THROWS_WHAT(CollectionCursorCache::eraseCursorGlobal(&_txn, cursorId),
MsgAssertionException, expectedAssertion);
- ctx.commit();
}
// Verify that the remaining document is read from the cursor.
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index c013594f4da..7d036168084 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -77,6 +77,7 @@ namespace ReplTests {
createOplog(&_txn);
Client::WriteContext ctx(&_txn, ns());
+ WriteUnitOfWork wuow(&_txn);
Collection* c = ctx.ctx().db()->getCollection(&_txn, ns());
if ( ! c ) {
@@ -84,7 +85,7 @@ namespace ReplTests {
}
ASSERT(c->getIndexCatalog()->haveIdIndex(&_txn));
- ctx.commit();
+ wuow.commit();
}
~Base() {
try {
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 9a3248a8578..a992d044be4 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -441,8 +441,15 @@ namespace mongo {
invariant( _deleteNotifyExec.get() == NULL );
WorkingSet* ws = new WorkingSet();
DeleteNotificationStage* dns = new DeleteNotificationStage();
+ PlanExecutor* deleteNotifyExec;
// Takes ownership of 'ws' and 'dns'.
- PlanExecutor* deleteNotifyExec = new PlanExecutor(txn, ws, dns, collection);
+ Status execStatus = PlanExecutor::make(txn,
+ ws,
+ dns,
+ collection,
+ PlanExecutor::YIELD_MANUAL,
+ &deleteNotifyExec);
+ invariant(execStatus.isOK());
deleteNotifyExec->registerExec();
_deleteNotifyExec.reset(deleteNotifyExec);
@@ -698,7 +705,6 @@ namespace mongo {
return NULL;
}
virtual std::vector<PlanStage*> getChildren() const {
- invariant( false );
vector<PlanStage*> empty;
return empty;
}
@@ -1730,13 +1736,14 @@ namespace mongo {
if ( entry["options"].isABSONObj() )
options = entry["options"].Obj();
+ WriteUnitOfWork wuow(txn);
Status status = userCreateNS( txn, db, ns, options, true, false );
if ( !status.isOK() ) {
warning() << "failed to create collection [" << ns << "] "
<< " with options " << options << ": " << status;
}
+ wuow.commit();
}
- ctx.commit();
}
{
@@ -1912,7 +1919,6 @@ namespace mongo {
}
Helpers::upsert( txn, ns, o, true );
- cx.commit();
}
thisTime++;
numCloned++;
@@ -2121,7 +2127,6 @@ namespace mongo {
BSONObjIterator i( xfer["deleted"].Obj() );
while ( i.more() ) {
Lock::CollectionLock clk(txn->lockState(), ns, MODE_X);
- WriteUnitOfWork wunit(txn);
Client::Context ctx(txn, ns);
BSONObj id = i.next().Obj();
@@ -2150,7 +2155,6 @@ namespace mongo {
true /* fromMigrate */);
*lastOpApplied = ctx.getClient()->getLastOp().asDate();
- wunit.commit();
didAnything = true;
}
}
@@ -2180,7 +2184,6 @@ namespace mongo {
Helpers::upsert( txn, ns , it , true );
*lastOpApplied = cx.ctx().getClient()->getLastOp().asDate();
- cx.commit();
didAnything = true;
}
}