summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/plan_executor.cpp')
-rw-r--r--src/mongo/db/query/plan_executor.cpp837
1 files changed, 411 insertions, 426 deletions
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 24b01bb704e..f234948fe50 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -48,513 +48,498 @@
namespace mongo {
- using std::shared_ptr;
- using std::string;
- using std::vector;
-
- namespace {
-
- /**
- * Retrieves the first stage of a given type from the plan tree, or NULL
- * if no such stage is found.
- */
- PlanStage* getStageByType(PlanStage* root, StageType type) {
- if (root->stageType() == type) {
- return root;
- }
+using std::shared_ptr;
+using std::string;
+using std::vector;
- vector<PlanStage*> children = root->getChildren();
- for (size_t i = 0; i < children.size(); i++) {
- PlanStage* result = getStageByType(children[i], type);
- if (result) {
- return result;
- }
- }
-
- return NULL;
- }
+namespace {
+/**
+ * Retrieves the first stage of a given type from the plan tree, or NULL
+ * if no such stage is found.
+ */
+PlanStage* getStageByType(PlanStage* root, StageType type) {
+ if (root->stageType() == type) {
+ return root;
}
- // static
- Status PlanExecutor::make(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- const Collection* collection,
- YieldPolicy yieldPolicy,
- PlanExecutor** out) {
- return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, collection, "", yieldPolicy, out);
+ vector<PlanStage*> children = root->getChildren();
+ for (size_t i = 0; i < children.size(); i++) {
+ PlanStage* result = getStageByType(children[i], type);
+ if (result) {
+ return result;
+ }
}
- // static
- Status PlanExecutor::make(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- const std::string& ns,
- YieldPolicy yieldPolicy,
- PlanExecutor** out) {
- return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, NULL, ns, yieldPolicy, out);
+ return NULL;
+}
+}
+
+// static
+Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, collection, "", yieldPolicy, out);
+}
+
+// static
+Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ const std::string& ns,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, NULL, NULL, NULL, ns, yieldPolicy, out);
+}
+
+// static
+Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, NULL, cq, collection, "", yieldPolicy, out);
+}
+
+// static
+Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ return PlanExecutor::make(opCtx, ws, rt, qs, cq, collection, "", yieldPolicy, out);
+}
+
+// static
+Status PlanExecutor::make(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ const std::string& ns,
+ YieldPolicy yieldPolicy,
+ PlanExecutor** out) {
+ std::unique_ptr<PlanExecutor> exec(new PlanExecutor(opCtx, ws, rt, qs, cq, collection, ns));
+
+ // Perform plan selection, if necessary.
+ Status status = exec->pickBestPlan(yieldPolicy);
+ if (!status.isOK()) {
+ return status;
}
- // static
- Status PlanExecutor::make(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- CanonicalQuery* cq,
- const Collection* collection,
- YieldPolicy yieldPolicy,
- PlanExecutor** out) {
- return PlanExecutor::make(opCtx, ws, rt, NULL, cq, collection, "", yieldPolicy, out);
+ *out = exec.release();
+ return Status::OK();
+}
+
+PlanExecutor::PlanExecutor(OperationContext* opCtx,
+ WorkingSet* ws,
+ PlanStage* rt,
+ QuerySolution* qs,
+ CanonicalQuery* cq,
+ const Collection* collection,
+ const std::string& ns)
+ : _opCtx(opCtx),
+ _collection(collection),
+ _cq(cq),
+ _workingSet(ws),
+ _qs(qs),
+ _root(rt),
+ _ns(ns),
+ _yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) {
+ // We may still need to initialize _ns from either _collection or _cq.
+ if (!_ns.empty()) {
+ // We already have an _ns set, so there's nothing more to do.
+ return;
}
- // static
- Status PlanExecutor::make(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- QuerySolution* qs,
- CanonicalQuery* cq,
- const Collection* collection,
- YieldPolicy yieldPolicy,
- PlanExecutor** out) {
- return PlanExecutor::make(opCtx, ws, rt, qs, cq, collection, "", yieldPolicy, out);
+ if (NULL != _collection) {
+ _ns = _collection->ns().ns();
+ } else {
+ invariant(NULL != _cq.get());
+ _ns = _cq->getParsed().ns();
}
-
- // static
- Status PlanExecutor::make(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- QuerySolution* qs,
- CanonicalQuery* cq,
- const Collection* collection,
- const std::string& ns,
- YieldPolicy yieldPolicy,
- PlanExecutor** out) {
- std::unique_ptr<PlanExecutor> exec(new PlanExecutor(opCtx, ws, rt, qs, cq, collection, ns));
-
- // Perform plan selection, if necessary.
- Status status = exec->pickBestPlan(yieldPolicy);
- if (!status.isOK()) {
- return status;
- }
-
- *out = exec.release();
- return Status::OK();
+}
+
+Status PlanExecutor::pickBestPlan(YieldPolicy policy) {
+ // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and
+ // register it to receive notifications.
+ this->setYieldPolicy(policy);
+
+ // First check if we need to do subplanning.
+ PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN);
+ if (foundStage) {
+ SubplanStage* subplan = static_cast<SubplanStage*>(foundStage);
+ return subplan->pickBestPlan(_yieldPolicy.get());
}
- PlanExecutor::PlanExecutor(OperationContext* opCtx,
- WorkingSet* ws,
- PlanStage* rt,
- QuerySolution* qs,
- CanonicalQuery* cq,
- const Collection* collection,
- const std::string& ns)
- : _opCtx(opCtx),
- _collection(collection),
- _cq(cq),
- _workingSet(ws),
- _qs(qs),
- _root(rt),
- _ns(ns),
- _yieldPolicy(new PlanYieldPolicy(this, YIELD_MANUAL)) {
- // We may still need to initialize _ns from either _collection or _cq.
- if (!_ns.empty()) {
- // We already have an _ns set, so there's nothing more to do.
- return;
- }
+ // If we didn't have to do subplanning, we might still have to do regular
+ // multi plan selection...
+ foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN);
+ if (foundStage) {
+ MultiPlanStage* mps = static_cast<MultiPlanStage*>(foundStage);
+ return mps->pickBestPlan(_yieldPolicy.get());
+ }
- if (NULL != _collection) {
- _ns = _collection->ns().ns();
- }
- else {
- invariant(NULL != _cq.get());
- _ns = _cq->getParsed().ns();
- }
+ // ...or, we might have to run a plan from the cache for a trial period, falling back on
+ // regular planning if the cached plan performs poorly.
+ foundStage = getStageByType(_root.get(), STAGE_CACHED_PLAN);
+ if (foundStage) {
+ CachedPlanStage* cachedPlan = static_cast<CachedPlanStage*>(foundStage);
+ return cachedPlan->pickBestPlan(_yieldPolicy.get());
}
- Status PlanExecutor::pickBestPlan(YieldPolicy policy) {
- // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and
- // register it to receive notifications.
- this->setYieldPolicy(policy);
+ // Either we chose a plan, or no plan selection was required. In both cases,
+ // our work has been successfully completed.
+ return Status::OK();
+}
+
+PlanExecutor::~PlanExecutor() {}
+
+// static
+std::string PlanExecutor::statestr(ExecState s) {
+ if (PlanExecutor::ADVANCED == s) {
+ return "ADVANCED";
+ } else if (PlanExecutor::IS_EOF == s) {
+ return "IS_EOF";
+ } else if (PlanExecutor::DEAD == s) {
+ return "DEAD";
+ } else {
+ verify(PlanExecutor::FAILURE == s);
+ return "FAILURE";
+ }
+}
- // First check if we need to do subplanning.
- PlanStage* foundStage = getStageByType(_root.get(), STAGE_SUBPLAN);
- if (foundStage) {
- SubplanStage* subplan = static_cast<SubplanStage*>(foundStage);
- return subplan->pickBestPlan(_yieldPolicy.get());
- }
+WorkingSet* PlanExecutor::getWorkingSet() const {
+ return _workingSet.get();
+}
- // If we didn't have to do subplanning, we might still have to do regular
- // multi plan selection...
- foundStage = getStageByType(_root.get(), STAGE_MULTI_PLAN);
- if (foundStage) {
- MultiPlanStage* mps = static_cast<MultiPlanStage*>(foundStage);
- return mps->pickBestPlan(_yieldPolicy.get());
- }
+PlanStage* PlanExecutor::getRootStage() const {
+ return _root.get();
+}
- // ...or, we might have to run a plan from the cache for a trial period, falling back on
- // regular planning if the cached plan performs poorly.
- foundStage = getStageByType(_root.get(), STAGE_CACHED_PLAN);
- if (foundStage) {
- CachedPlanStage* cachedPlan = static_cast<CachedPlanStage*>(foundStage);
- return cachedPlan->pickBestPlan(_yieldPolicy.get());
- }
+CanonicalQuery* PlanExecutor::getCanonicalQuery() const {
+ return _cq.get();
+}
- // Either we chose a plan, or no plan selection was required. In both cases,
- // our work has been successfully completed.
- return Status::OK();
- }
+PlanStageStats* PlanExecutor::getStats() const {
+ return _root->getStats();
+}
- PlanExecutor::~PlanExecutor() { }
+const Collection* PlanExecutor::collection() const {
+ return _collection;
+}
- // static
- std::string PlanExecutor::statestr(ExecState s) {
- if (PlanExecutor::ADVANCED == s) {
- return "ADVANCED";
- }
- else if (PlanExecutor::IS_EOF == s) {
- return "IS_EOF";
- }
- else if (PlanExecutor::DEAD == s) {
- return "DEAD";
- }
- else {
- verify(PlanExecutor::FAILURE == s);
- return "FAILURE";
- }
- }
+OperationContext* PlanExecutor::getOpCtx() const {
+ return _opCtx;
+}
- WorkingSet* PlanExecutor::getWorkingSet() const {
- return _workingSet.get();
+void PlanExecutor::saveState() {
+ if (!killed()) {
+ _root->saveState();
}
- PlanStage* PlanExecutor::getRootStage() const {
- return _root.get();
+ // Doc-locking storage engines drop their transactional context after saving state.
+ // The query stages inside this stage tree might buffer record ids (e.g. text, geoNear,
+ // mergeSort, sort) which are no longer protected by the storage engine's transactional
+ // boundaries. Force-fetch the documents for any such record ids so that we have our
+ // own copy in the working set.
+ if (supportsDocLocking()) {
+ WorkingSetCommon::prepareForSnapshotChange(_workingSet.get());
}
- CanonicalQuery* PlanExecutor::getCanonicalQuery() const {
- return _cq.get();
- }
+ _opCtx = NULL;
+}
- PlanStageStats* PlanExecutor::getStats() const {
- return _root->getStats();
- }
+bool PlanExecutor::restoreState(OperationContext* opCtx) {
+ try {
+ return restoreStateWithoutRetrying(opCtx);
+ } catch (const WriteConflictException& wce) {
+ if (!_yieldPolicy->allowedToYield())
+ throw;
- const Collection* PlanExecutor::collection() const {
- return _collection;
+ // Handles retries by calling restoreStateWithoutRetrying() in a loop.
+ return _yieldPolicy->yield(NULL);
}
+}
- OperationContext* PlanExecutor::getOpCtx() const {
- return _opCtx;
- }
+bool PlanExecutor::restoreStateWithoutRetrying(OperationContext* opCtx) {
+ invariant(NULL == _opCtx);
+ invariant(opCtx);
- void PlanExecutor::saveState() {
- if (!killed()) {
- _root->saveState();
- }
+ _opCtx = opCtx;
- // Doc-locking storage engines drop their transactional context after saving state.
- // The query stages inside this stage tree might buffer record ids (e.g. text, geoNear,
- // mergeSort, sort) which are no longer protected by the storage engine's transactional
- // boundaries. Force-fetch the documents for any such record ids so that we have our
- // own copy in the working set.
- if (supportsDocLocking()) {
- WorkingSetCommon::prepareForSnapshotChange(_workingSet.get());
- }
+ // We're restoring after a yield or getMore now. If we're a yielding plan executor, reset
+ // the yield timer in order to prevent from yielding again right away.
+ _yieldPolicy->resetTimer();
- _opCtx = NULL;
+ if (!killed()) {
+ _root->restoreState(opCtx);
}
- bool PlanExecutor::restoreState(OperationContext* opCtx) {
- try {
- return restoreStateWithoutRetrying(opCtx);
- }
- catch (const WriteConflictException& wce) {
- if (!_yieldPolicy->allowedToYield())
- throw;
+ return !killed();
+}
- // Handles retries by calling restoreStateWithoutRetrying() in a loop.
- return _yieldPolicy->yield(NULL);
- }
+void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ if (!killed()) {
+ _root->invalidate(txn, dl, type);
}
+}
- bool PlanExecutor::restoreStateWithoutRetrying(OperationContext* opCtx) {
- invariant(NULL == _opCtx);
- invariant(opCtx);
+PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) {
+ Snapshotted<BSONObj> snapshotted;
+ ExecState state = getNextSnapshotted(objOut ? &snapshotted : NULL, dlOut);
- _opCtx = opCtx;
-
- // We're restoring after a yield or getMore now. If we're a yielding plan executor, reset
- // the yield timer in order to prevent from yielding again right away.
- _yieldPolicy->resetTimer();
-
- if (!killed()) {
- _root->restoreState(opCtx);
- }
-
- return !killed();
- }
-
- void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
- if (!killed()) { _root->invalidate(txn, dl, type); }
+ if (objOut) {
+ *objOut = snapshotted.value();
}
- PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) {
- Snapshotted<BSONObj> snapshotted;
- ExecState state = getNextSnapshotted(objOut ? &snapshotted : NULL, dlOut);
-
- if (objOut) {
- *objOut = snapshotted.value();
+ return state;
+}
+
+PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut,
+ RecordId* dlOut) {
+ if (killed()) {
+ if (NULL != objOut) {
+ Status status(ErrorCodes::OperationFailed,
+ str::stream() << "Operation aborted because: " << *_killReason);
+ *objOut = Snapshotted<BSONObj>(SnapshotId(),
+ WorkingSetCommon::buildMemberStatusObject(status));
}
-
- return state;
+ return PlanExecutor::DEAD;
}
- PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut,
- RecordId* dlOut) {
- if (killed()) {
- if (NULL != objOut) {
- Status status(ErrorCodes::OperationFailed,
- str::stream() << "Operation aborted because: " << *_killReason);
- *objOut = Snapshotted<BSONObj>(SnapshotId(),
- WorkingSetCommon::buildMemberStatusObject(status));
- }
- return PlanExecutor::DEAD;
- }
-
- if (!_stash.empty()) {
- invariant(objOut && !dlOut);
- *objOut = {SnapshotId(), _stash.front()};
- _stash.pop();
- return PlanExecutor::ADVANCED;
- }
+ if (!_stash.empty()) {
+ invariant(objOut && !dlOut);
+ *objOut = {SnapshotId(), _stash.front()};
+ _stash.pop();
+ return PlanExecutor::ADVANCED;
+ }
- // When a stage requests a yield for document fetch, it gives us back a RecordFetcher*
- // to use to pull the record into memory. We take ownership of the RecordFetcher here,
- // deleting it after we've had a chance to do the fetch. For timing-based yields, we
- // just pass a NULL fetcher.
- std::unique_ptr<RecordFetcher> fetcher;
-
- // Incremented on every writeConflict, reset to 0 on any successful call to _root->work.
- size_t writeConflictsInARow = 0;
-
- for (;;) {
- // These are the conditions which can cause us to yield:
- // 1) The yield policy's timer elapsed, or
- // 2) some stage requested a yield due to a document fetch, or
- // 3) we need to yield and retry due to a WriteConflictException.
- // In all cases, the actual yielding happens here.
- if (_yieldPolicy->shouldYield()) {
- _yieldPolicy->yield(fetcher.get());
-
- if (killed()) {
- if (NULL != objOut) {
- Status status(ErrorCodes::OperationFailed,
- str::stream() << "Operation aborted because: "
- << *_killReason);
- *objOut = Snapshotted<BSONObj>(
- SnapshotId(),
- WorkingSetCommon::buildMemberStatusObject(status));
- }
- return PlanExecutor::DEAD;
+ // When a stage requests a yield for document fetch, it gives us back a RecordFetcher*
+ // to use to pull the record into memory. We take ownership of the RecordFetcher here,
+ // deleting it after we've had a chance to do the fetch. For timing-based yields, we
+ // just pass a NULL fetcher.
+ std::unique_ptr<RecordFetcher> fetcher;
+
+ // Incremented on every writeConflict, reset to 0 on any successful call to _root->work.
+ size_t writeConflictsInARow = 0;
+
+ for (;;) {
+ // These are the conditions which can cause us to yield:
+ // 1) The yield policy's timer elapsed, or
+ // 2) some stage requested a yield due to a document fetch, or
+ // 3) we need to yield and retry due to a WriteConflictException.
+ // In all cases, the actual yielding happens here.
+ if (_yieldPolicy->shouldYield()) {
+ _yieldPolicy->yield(fetcher.get());
+
+ if (killed()) {
+ if (NULL != objOut) {
+ Status status(ErrorCodes::OperationFailed,
+ str::stream() << "Operation aborted because: " << *_killReason);
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
}
+ return PlanExecutor::DEAD;
}
+ }
- // We're done using the fetcher, so it should be freed. We don't want to
- // use the same RecordFetcher twice.
- fetcher.reset();
-
- WorkingSetID id = WorkingSet::INVALID_ID;
- PlanStage::StageState code = _root->work(&id);
+ // We're done using the fetcher, so it should be freed. We don't want to
+ // use the same RecordFetcher twice.
+ fetcher.reset();
- if (code != PlanStage::NEED_YIELD)
- writeConflictsInARow = 0;
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState code = _root->work(&id);
- if (PlanStage::ADVANCED == code) {
- // Fast count.
- if (WorkingSet::INVALID_ID == id) {
- invariant(NULL == objOut);
- invariant(NULL == dlOut);
- return PlanExecutor::ADVANCED;
- }
+ if (code != PlanStage::NEED_YIELD)
+ writeConflictsInARow = 0;
- WorkingSetMember* member = _workingSet->get(id);
- bool hasRequestedData = true;
+ if (PlanStage::ADVANCED == code) {
+ // Fast count.
+ if (WorkingSet::INVALID_ID == id) {
+ invariant(NULL == objOut);
+ invariant(NULL == dlOut);
+ return PlanExecutor::ADVANCED;
+ }
- if (NULL != objOut) {
- if (WorkingSetMember::LOC_AND_IDX == member->state) {
- if (1 != member->keyData.size()) {
- _workingSet->free(id);
- hasRequestedData = false;
- }
- else {
- // TODO: currently snapshot ids are only associated with documents, and
- // not with index keys.
- *objOut = Snapshotted<BSONObj>(SnapshotId(),
- member->keyData[0].keyData);
- }
- }
- else if (member->hasObj()) {
- *objOut = member->obj;
- }
- else {
- _workingSet->free(id);
- hasRequestedData = false;
- }
- }
+ WorkingSetMember* member = _workingSet->get(id);
+ bool hasRequestedData = true;
- if (NULL != dlOut) {
- if (member->hasLoc()) {
- *dlOut = member->loc;
- }
- else {
+ if (NULL != objOut) {
+ if (WorkingSetMember::LOC_AND_IDX == member->state) {
+ if (1 != member->keyData.size()) {
_workingSet->free(id);
hasRequestedData = false;
+ } else {
+ // TODO: currently snapshot ids are only associated with documents, and
+ // not with index keys.
+ *objOut = Snapshotted<BSONObj>(SnapshotId(), member->keyData[0].keyData);
}
- }
-
- if (hasRequestedData) {
+ } else if (member->hasObj()) {
+ *objOut = member->obj;
+ } else {
_workingSet->free(id);
- return PlanExecutor::ADVANCED;
+ hasRequestedData = false;
}
- // This result didn't have the data the caller wanted, try again.
}
- else if (PlanStage::NEED_YIELD == code) {
- if (id == WorkingSet::INVALID_ID) {
- if (!_yieldPolicy->allowedToYield()) throw WriteConflictException();
- CurOp::get(_opCtx)->debug().writeConflicts++;
- writeConflictsInARow++;
- WriteConflictException::logAndBackoff(writeConflictsInARow,
- "plan execution",
- _collection->ns().ns());
+ if (NULL != dlOut) {
+ if (member->hasLoc()) {
+ *dlOut = member->loc;
+ } else {
+ _workingSet->free(id);
+ hasRequestedData = false;
}
- else {
- WorkingSetMember* member = _workingSet->get(id);
- invariant(member->hasFetcher());
- // Transfer ownership of the fetcher. Next time around the loop a yield will
- // happen.
- fetcher.reset(member->releaseFetcher());
- }
-
- // If we're allowed to, we will yield next time through the loop.
- if (_yieldPolicy->allowedToYield()) _yieldPolicy->forceYield();
}
- else if (PlanStage::NEED_TIME == code) {
- // Fall through to yield check at end of large conditional.
+
+ if (hasRequestedData) {
+ _workingSet->free(id);
+ return PlanExecutor::ADVANCED;
}
- else if (PlanStage::IS_EOF == code) {
- return PlanExecutor::IS_EOF;
+ // This result didn't have the data the caller wanted, try again.
+ } else if (PlanStage::NEED_YIELD == code) {
+ if (id == WorkingSet::INVALID_ID) {
+ if (!_yieldPolicy->allowedToYield())
+ throw WriteConflictException();
+ CurOp::get(_opCtx)->debug().writeConflicts++;
+ writeConflictsInARow++;
+ WriteConflictException::logAndBackoff(
+ writeConflictsInARow, "plan execution", _collection->ns().ns());
+
+ } else {
+ WorkingSetMember* member = _workingSet->get(id);
+ invariant(member->hasFetcher());
+ // Transfer ownership of the fetcher. Next time around the loop a yield will
+ // happen.
+ fetcher.reset(member->releaseFetcher());
}
- else {
- invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);
- if (NULL != objOut) {
- BSONObj statusObj;
- WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &statusObj);
- *objOut = Snapshotted<BSONObj>(SnapshotId(), statusObj);
- }
+ // If we're allowed to, we will yield next time through the loop.
+ if (_yieldPolicy->allowedToYield())
+ _yieldPolicy->forceYield();
+ } else if (PlanStage::NEED_TIME == code) {
+ // Fall through to yield check at end of large conditional.
+ } else if (PlanStage::IS_EOF == code) {
+ return PlanExecutor::IS_EOF;
+ } else {
+ invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);
- return (PlanStage::DEAD == code) ? PlanExecutor::DEAD : PlanExecutor::FAILURE;
+ if (NULL != objOut) {
+ BSONObj statusObj;
+ WorkingSetCommon::getStatusMemberObject(*_workingSet, id, &statusObj);
+ *objOut = Snapshotted<BSONObj>(SnapshotId(), statusObj);
}
- }
- }
-
- bool PlanExecutor::isEOF() {
- return killed() || (_stash.empty() && _root->isEOF());
- }
- void PlanExecutor::registerExec() {
- _safety.reset(new ScopedExecutorRegistration(this));
- }
-
- void PlanExecutor::deregisterExec() {
- _safety.reset();
+ return (PlanStage::DEAD == code) ? PlanExecutor::DEAD : PlanExecutor::FAILURE;
+ }
}
-
- void PlanExecutor::kill(std::string reason) {
- _killReason = std::move(reason);
- _collection = NULL;
-
- // XXX: PlanExecutor is designed to wrap a single execution tree. In the case of
- // aggregation queries, PlanExecutor wraps a proxy stage responsible for pulling results
- // from an aggregation pipeline. The aggregation pipeline pulls results from yet another
- // PlanExecutor. Such nested PlanExecutors require us to manually propagate kill() to
- // the "inner" executor. This is bad, and hopefully can be fixed down the line with the
- // unification of agg and query.
- //
- // The CachedPlanStage is another special case. It needs to update the plan cache from
- // its destructor. It needs to know whether it has been killed so that it can avoid
- // touching a potentially invalid plan cache in this case.
- //
- // TODO: get rid of this code block.
- {
- PlanStage* foundStage = getStageByType(_root.get(), STAGE_PIPELINE_PROXY);
- if (foundStage) {
- PipelineProxyStage* proxyStage = static_cast<PipelineProxyStage*>(foundStage);
- shared_ptr<PlanExecutor> childExec = proxyStage->getChildExecutor();
- if (childExec) {
- childExec->kill(*_killReason);
- }
+}
+
+bool PlanExecutor::isEOF() {
+ return killed() || (_stash.empty() && _root->isEOF());
+}
+
+void PlanExecutor::registerExec() {
+ _safety.reset(new ScopedExecutorRegistration(this));
+}
+
+void PlanExecutor::deregisterExec() {
+ _safety.reset();
+}
+
+void PlanExecutor::kill(std::string reason) {
+ _killReason = std::move(reason);
+ _collection = NULL;
+
+ // XXX: PlanExecutor is designed to wrap a single execution tree. In the case of
+ // aggregation queries, PlanExecutor wraps a proxy stage responsible for pulling results
+ // from an aggregation pipeline. The aggregation pipeline pulls results from yet another
+ // PlanExecutor. Such nested PlanExecutors require us to manually propagate kill() to
+ // the "inner" executor. This is bad, and hopefully can be fixed down the line with the
+ // unification of agg and query.
+ //
+ // The CachedPlanStage is another special case. It needs to update the plan cache from
+ // its destructor. It needs to know whether it has been killed so that it can avoid
+ // touching a potentially invalid plan cache in this case.
+ //
+ // TODO: get rid of this code block.
+ {
+ PlanStage* foundStage = getStageByType(_root.get(), STAGE_PIPELINE_PROXY);
+ if (foundStage) {
+ PipelineProxyStage* proxyStage = static_cast<PipelineProxyStage*>(foundStage);
+ shared_ptr<PlanExecutor> childExec = proxyStage->getChildExecutor();
+ if (childExec) {
+ childExec->kill(*_killReason);
}
}
}
+}
- Status PlanExecutor::executePlan() {
- BSONObj obj;
- PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
- while (PlanExecutor::ADVANCED == state) {
- state = this->getNext(&obj, NULL);
- }
-
- if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "Exec error: " << WorkingSetCommon::toStatusString(obj)
- << ", state: " << PlanExecutor::statestr(state));
- }
-
- invariant(PlanExecutor::IS_EOF == state);
- return Status::OK();
+Status PlanExecutor::executePlan() {
+ BSONObj obj;
+ PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
+ while (PlanExecutor::ADVANCED == state) {
+ state = this->getNext(&obj, NULL);
}
- const string& PlanExecutor::ns() {
- return _ns;
+ if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "Exec error: " << WorkingSetCommon::toStatusString(obj)
+ << ", state: " << PlanExecutor::statestr(state));
}
- void PlanExecutor::setYieldPolicy(YieldPolicy policy, bool registerExecutor) {
- _yieldPolicy->setPolicy(policy);
- if (PlanExecutor::YIELD_AUTO == policy) {
- // Runners that yield automatically generally need to be registered so that
- // after yielding, they receive notifications of events like deletions and
- // index drops. The only exception is that a few PlanExecutors get registered
- // by ClientCursor instead of being registered here. This is unneeded if we only do
- // partial "yields" for WriteConflict retrying.
- if (registerExecutor) {
- this->registerExec();
- }
+ invariant(PlanExecutor::IS_EOF == state);
+ return Status::OK();
+}
+
+const string& PlanExecutor::ns() {
+ return _ns;
+}
+
+void PlanExecutor::setYieldPolicy(YieldPolicy policy, bool registerExecutor) {
+ _yieldPolicy->setPolicy(policy);
+ if (PlanExecutor::YIELD_AUTO == policy) {
+ // Runners that yield automatically generally need to be registered so that
+ // after yielding, they receive notifications of events like deletions and
+ // index drops. The only exception is that a few PlanExecutors get registered
+ // by ClientCursor instead of being registered here. This is unneeded if we only do
+ // partial "yields" for WriteConflict retrying.
+ if (registerExecutor) {
+ this->registerExec();
}
}
-
- void PlanExecutor::enqueue(const BSONObj& obj) {
- _stash.push(obj.getOwned());
- }
-
- //
- // ScopedExecutorRegistration
- //
-
- PlanExecutor::ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec)
- : _exec(exec) {
- // Collection can be null for an EOFStage plan, or other places where registration
- // is not needed.
- if (_exec->collection()) {
- _exec->collection()->getCursorManager()->registerExecutor(exec);
- }
+}
+
+void PlanExecutor::enqueue(const BSONObj& obj) {
+ _stash.push(obj.getOwned());
+}
+
+//
+// ScopedExecutorRegistration
+//
+
+PlanExecutor::ScopedExecutorRegistration::ScopedExecutorRegistration(PlanExecutor* exec)
+ : _exec(exec) {
+ // Collection can be null for an EOFStage plan, or other places where registration
+ // is not needed.
+ if (_exec->collection()) {
+ _exec->collection()->getCursorManager()->registerExecutor(exec);
}
+}
- PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() {
- if (_exec->collection()) {
- _exec->collection()->getCursorManager()->deregisterExecutor(_exec);
- }
+PlanExecutor::ScopedExecutorRegistration::~ScopedExecutorRegistration() {
+ if (_exec->collection()) {
+ _exec->collection()->getCursorManager()->deregisterExecutor(_exec);
}
+}
-} // namespace mongo
+} // namespace mongo