summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/fetch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/fetch.cpp')
-rw-r--r--src/mongo/db/exec/fetch.cpp370
1 files changed, 184 insertions, 186 deletions
diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp
index 817bc72fc8d..cab7655f2f0 100644
--- a/src/mongo/db/exec/fetch.cpp
+++ b/src/mongo/db/exec/fetch.cpp
@@ -41,216 +41,214 @@
namespace mongo {
- using std::unique_ptr;
- using std::vector;
-
- // static
- const char* FetchStage::kStageType = "FETCH";
-
- FetchStage::FetchStage(OperationContext* txn,
- WorkingSet* ws,
- PlanStage* child,
- const MatchExpression* filter,
- const Collection* collection)
- : _txn(txn),
- _collection(collection),
- _ws(ws),
- _child(child),
- _filter(filter),
- _idRetrying(WorkingSet::INVALID_ID),
- _commonStats(kStageType) { }
-
- FetchStage::~FetchStage() { }
-
- bool FetchStage::isEOF() {
- if (WorkingSet::INVALID_ID != _idRetrying) {
- // We asked the parent for a page-in, but still haven't had a chance to return the
- // paged in document
- return false;
- }
-
- return _child->isEOF();
+using std::unique_ptr;
+using std::vector;
+
+// static
+const char* FetchStage::kStageType = "FETCH";
+
+FetchStage::FetchStage(OperationContext* txn,
+ WorkingSet* ws,
+ PlanStage* child,
+ const MatchExpression* filter,
+ const Collection* collection)
+ : _txn(txn),
+ _collection(collection),
+ _ws(ws),
+ _child(child),
+ _filter(filter),
+ _idRetrying(WorkingSet::INVALID_ID),
+ _commonStats(kStageType) {}
+
+FetchStage::~FetchStage() {}
+
+bool FetchStage::isEOF() {
+ if (WorkingSet::INVALID_ID != _idRetrying) {
+ // We asked the parent for a page-in, but still haven't had a chance to return the
+ // paged in document
+ return false;
}
- PlanStage::StageState FetchStage::work(WorkingSetID* out) {
- ++_commonStats.works;
+ return _child->isEOF();
+}
- // Adds the amount of time taken by work() to executionTimeMillis.
- ScopedTimer timer(&_commonStats.executionTimeMillis);
+PlanStage::StageState FetchStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
- if (isEOF()) { return PlanStage::IS_EOF; }
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
- // Either retry the last WSM we worked on or get a new one from our child.
- WorkingSetID id;
- StageState status;
- if (_idRetrying == WorkingSet::INVALID_ID) {
- status = _child->work(&id);
- }
- else {
- status = ADVANCED;
- id = _idRetrying;
- _idRetrying = WorkingSet::INVALID_ID;
- }
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
- if (PlanStage::ADVANCED == status) {
- WorkingSetMember* member = _ws->get(id);
+ // Either retry the last WSM we worked on or get a new one from our child.
+ WorkingSetID id;
+ StageState status;
+ if (_idRetrying == WorkingSet::INVALID_ID) {
+ status = _child->work(&id);
+ } else {
+ status = ADVANCED;
+ id = _idRetrying;
+ _idRetrying = WorkingSet::INVALID_ID;
+ }
- // If there's an obj there, there is no fetching to perform.
- if (member->hasObj()) {
- ++_specificStats.alreadyHasObj;
- }
- else {
- // We need a valid loc to fetch from and this is the only state that has one.
- verify(WorkingSetMember::LOC_AND_IDX == member->state);
- verify(member->hasLoc());
-
- try {
- if (!_cursor) _cursor = _collection->getCursor(_txn);
-
- if (auto fetcher = _cursor->fetcherForId(member->loc)) {
- // There's something to fetch. Hand the fetcher off to the WSM, and pass up
- // a fetch request.
- _idRetrying = id;
- member->setFetcher(fetcher.release());
- *out = id;
- _commonStats.needYield++;
- return NEED_YIELD;
- }
-
- // The doc is already in memory, so go ahead and grab it. Now we have a RecordId
- // as well as an unowned object
- if (!WorkingSetCommon::fetch(_txn, member, _cursor)) {
- _ws->free(id);
- _commonStats.needTime++;
- return NEED_TIME;
- }
- }
- catch (const WriteConflictException& wce) {
+ if (PlanStage::ADVANCED == status) {
+ WorkingSetMember* member = _ws->get(id);
+
+ // If there's an obj there, there is no fetching to perform.
+ if (member->hasObj()) {
+ ++_specificStats.alreadyHasObj;
+ } else {
+ // We need a valid loc to fetch from and this is the only state that has one.
+ verify(WorkingSetMember::LOC_AND_IDX == member->state);
+ verify(member->hasLoc());
+
+ try {
+ if (!_cursor)
+ _cursor = _collection->getCursor(_txn);
+
+ if (auto fetcher = _cursor->fetcherForId(member->loc)) {
+ // There's something to fetch. Hand the fetcher off to the WSM, and pass up
+ // a fetch request.
_idRetrying = id;
- *out = WorkingSet::INVALID_ID;
+ member->setFetcher(fetcher.release());
+ *out = id;
_commonStats.needYield++;
return NEED_YIELD;
}
- }
- return returnIfMatches(member, id, out);
- }
- else if (PlanStage::FAILURE == status || PlanStage::DEAD == status) {
- *out = id;
- // If a stage fails, it may create a status WSM to indicate why it
- // failed, in which case 'id' is valid. If ID is invalid, we
- // create our own error message.
- if (WorkingSet::INVALID_ID == id) {
- mongoutils::str::stream ss;
- ss << "fetch stage failed to read in results from child";
- Status status(ErrorCodes::InternalError, ss);
- *out = WorkingSetCommon::allocateStatusMember( _ws, status);
+ // The doc is already in memory, so go ahead and grab it. Now we have a RecordId
+ // as well as an unowned object
+ if (!WorkingSetCommon::fetch(_txn, member, _cursor)) {
+ _ws->free(id);
+ _commonStats.needTime++;
+ return NEED_TIME;
+ }
+ } catch (const WriteConflictException& wce) {
+ _idRetrying = id;
+ *out = WorkingSet::INVALID_ID;
+ _commonStats.needYield++;
+ return NEED_YIELD;
}
- return status;
- }
- else if (PlanStage::NEED_TIME == status) {
- ++_commonStats.needTime;
- }
- else if (PlanStage::NEED_YIELD == status) {
- ++_commonStats.needYield;
- *out = id;
}
+ return returnIfMatches(member, id, out);
+ } else if (PlanStage::FAILURE == status || PlanStage::DEAD == status) {
+ *out = id;
+ // If a stage fails, it may create a status WSM to indicate why it
+ // failed, in which case 'id' is valid. If ID is invalid, we
+ // create our own error message.
+ if (WorkingSet::INVALID_ID == id) {
+ mongoutils::str::stream ss;
+ ss << "fetch stage failed to read in results from child";
+ Status status(ErrorCodes::InternalError, ss);
+ *out = WorkingSetCommon::allocateStatusMember(_ws, status);
+ }
return status;
+ } else if (PlanStage::NEED_TIME == status) {
+ ++_commonStats.needTime;
+ } else if (PlanStage::NEED_YIELD == status) {
+ ++_commonStats.needYield;
+ *out = id;
}
- void FetchStage::saveState() {
- _txn = NULL;
- ++_commonStats.yields;
- if (_cursor) _cursor->saveUnpositioned();
- _child->saveState();
- }
-
- void FetchStage::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
- ++_commonStats.unyields;
- if (_cursor) _cursor->restore(opCtx);
- _child->restoreState(opCtx);
- }
-
- void FetchStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
- ++_commonStats.invalidates;
-
- _child->invalidate(txn, dl, type);
-
- // It's possible that the loc getting invalidated is the one we're about to
- // fetch. In this case we do a "forced fetch" and put the WSM in owned object state.
- if (WorkingSet::INVALID_ID != _idRetrying) {
- WorkingSetMember* member = _ws->get(_idRetrying);
- if (member->hasLoc() && (member->loc == dl)) {
- // Fetch it now and kill the diskloc.
- WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
- }
+ return status;
+}
+
+void FetchStage::saveState() {
+ _txn = NULL;
+ ++_commonStats.yields;
+ if (_cursor)
+ _cursor->saveUnpositioned();
+ _child->saveState();
+}
+
+void FetchStage::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ ++_commonStats.unyields;
+ if (_cursor)
+ _cursor->restore(opCtx);
+ _child->restoreState(opCtx);
+}
+
+void FetchStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ ++_commonStats.invalidates;
+
+ _child->invalidate(txn, dl, type);
+
+ // It's possible that the loc getting invalidated is the one we're about to
+ // fetch. In this case we do a "forced fetch" and put the WSM in owned object state.
+ if (WorkingSet::INVALID_ID != _idRetrying) {
+ WorkingSetMember* member = _ws->get(_idRetrying);
+ if (member->hasLoc() && (member->loc == dl)) {
+ // Fetch it now and kill the diskloc.
+ WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
}
}
-
- PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member,
- WorkingSetID memberID,
- WorkingSetID* out) {
- // We consider "examining a document" to be every time that we pass a document through
- // a filter by calling Filter::passes(...) below. Therefore, the 'docsExamined' metric
- // is not always equal to the number of documents that were fetched from the collection.
- // In particular, we can sometimes generate plans which have two fetch stages. The first
- // one actually grabs the document from the collection, and the second passes the
- // document through a second filter.
- //
- // One common example of this is geoNear. Suppose that a geoNear plan is searching an
- // annulus to find 2dsphere-indexed documents near some point (x, y) on the globe.
- // After fetching documents within geo hashes that intersect this annulus, the docs are
- // fetched and filtered to make sure that they really do fall into this annulus. However,
- // the user might also want to find only those documents for which accommodationType==
- // "restaurant". The planner will add a second fetch stage to filter by this non-geo
- // predicate.
- ++_specificStats.docsExamined;
-
- if (Filter::passes(member, _filter)) {
- *out = memberID;
-
- ++_commonStats.advanced;
- return PlanStage::ADVANCED;
- }
- else {
- _ws->free(memberID);
-
- ++_commonStats.needTime;
- return PlanStage::NEED_TIME;
- }
+}
+
+PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member,
+ WorkingSetID memberID,
+ WorkingSetID* out) {
+ // We consider "examining a document" to be every time that we pass a document through
+ // a filter by calling Filter::passes(...) below. Therefore, the 'docsExamined' metric
+ // is not always equal to the number of documents that were fetched from the collection.
+ // In particular, we can sometimes generate plans which have two fetch stages. The first
+ // one actually grabs the document from the collection, and the second passes the
+ // document through a second filter.
+ //
+ // One common example of this is geoNear. Suppose that a geoNear plan is searching an
+ // annulus to find 2dsphere-indexed documents near some point (x, y) on the globe.
+ // After fetching documents within geo hashes that intersect this annulus, the docs are
+ // fetched and filtered to make sure that they really do fall into this annulus. However,
+ // the user might also want to find only those documents for which accommodationType==
+ // "restaurant". The planner will add a second fetch stage to filter by this non-geo
+ // predicate.
+ ++_specificStats.docsExamined;
+
+ if (Filter::passes(member, _filter)) {
+ *out = memberID;
+
+ ++_commonStats.advanced;
+ return PlanStage::ADVANCED;
+ } else {
+ _ws->free(memberID);
+
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
}
-
- vector<PlanStage*> FetchStage::getChildren() const {
- vector<PlanStage*> children;
- children.push_back(_child.get());
- return children;
+}
+
+vector<PlanStage*> FetchStage::getChildren() const {
+ vector<PlanStage*> children;
+ children.push_back(_child.get());
+ return children;
+}
+
+PlanStageStats* FetchStage::getStats() {
+ _commonStats.isEOF = isEOF();
+
+ // Add a BSON representation of the filter to the stats tree, if there is one.
+ if (NULL != _filter) {
+ BSONObjBuilder bob;
+ _filter->toBSON(&bob);
+ _commonStats.filter = bob.obj();
}
- PlanStageStats* FetchStage::getStats() {
- _commonStats.isEOF = isEOF();
-
- // Add a BSON representation of the filter to the stats tree, if there is one.
- if (NULL != _filter) {
- BSONObjBuilder bob;
- _filter->toBSON(&bob);
- _commonStats.filter = bob.obj();
- }
-
- unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_FETCH));
- ret->specific.reset(new FetchStats(_specificStats));
- ret->children.push_back(_child->getStats());
- return ret.release();
- }
+ unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_FETCH));
+ ret->specific.reset(new FetchStats(_specificStats));
+ ret->children.push_back(_child->getStats());
+ return ret.release();
+}
- const CommonStats* FetchStage::getCommonStats() const {
- return &_commonStats;
- }
+const CommonStats* FetchStage::getCommonStats() const {
+ return &_commonStats;
+}
- const SpecificStats* FetchStage::getSpecificStats() const {
- return &_specificStats;
- }
+const SpecificStats* FetchStage::getSpecificStats() const {
+ return &_specificStats;
+}
} // namespace mongo