diff options
Diffstat (limited to 'src/mongo/db/exec/fetch.cpp')
-rw-r--r-- | src/mongo/db/exec/fetch.cpp | 370 |
1 files changed, 184 insertions, 186 deletions
diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp index 817bc72fc8d..cab7655f2f0 100644 --- a/src/mongo/db/exec/fetch.cpp +++ b/src/mongo/db/exec/fetch.cpp @@ -41,216 +41,214 @@ namespace mongo { - using std::unique_ptr; - using std::vector; - - // static - const char* FetchStage::kStageType = "FETCH"; - - FetchStage::FetchStage(OperationContext* txn, - WorkingSet* ws, - PlanStage* child, - const MatchExpression* filter, - const Collection* collection) - : _txn(txn), - _collection(collection), - _ws(ws), - _child(child), - _filter(filter), - _idRetrying(WorkingSet::INVALID_ID), - _commonStats(kStageType) { } - - FetchStage::~FetchStage() { } - - bool FetchStage::isEOF() { - if (WorkingSet::INVALID_ID != _idRetrying) { - // We asked the parent for a page-in, but still haven't had a chance to return the - // paged in document - return false; - } - - return _child->isEOF(); +using std::unique_ptr; +using std::vector; + +// static +const char* FetchStage::kStageType = "FETCH"; + +FetchStage::FetchStage(OperationContext* txn, + WorkingSet* ws, + PlanStage* child, + const MatchExpression* filter, + const Collection* collection) + : _txn(txn), + _collection(collection), + _ws(ws), + _child(child), + _filter(filter), + _idRetrying(WorkingSet::INVALID_ID), + _commonStats(kStageType) {} + +FetchStage::~FetchStage() {} + +bool FetchStage::isEOF() { + if (WorkingSet::INVALID_ID != _idRetrying) { + // We asked the parent for a page-in, but still haven't had a chance to return the + // paged in document + return false; } - PlanStage::StageState FetchStage::work(WorkingSetID* out) { - ++_commonStats.works; + return _child->isEOF(); +} - // Adds the amount of time taken by work() to executionTimeMillis. - ScopedTimer timer(&_commonStats.executionTimeMillis); +PlanStage::StageState FetchStage::work(WorkingSetID* out) { + ++_commonStats.works; - if (isEOF()) { return PlanStage::IS_EOF; } + // Adds the amount of time taken by work() to executionTimeMillis. + ScopedTimer timer(&_commonStats.executionTimeMillis); - // Either retry the last WSM we worked on or get a new one from our child. - WorkingSetID id; - StageState status; - if (_idRetrying == WorkingSet::INVALID_ID) { - status = _child->work(&id); - } - else { - status = ADVANCED; - id = _idRetrying; - _idRetrying = WorkingSet::INVALID_ID; - } + if (isEOF()) { + return PlanStage::IS_EOF; + } - if (PlanStage::ADVANCED == status) { - WorkingSetMember* member = _ws->get(id); + // Either retry the last WSM we worked on or get a new one from our child. + WorkingSetID id; + StageState status; + if (_idRetrying == WorkingSet::INVALID_ID) { + status = _child->work(&id); + } else { + status = ADVANCED; + id = _idRetrying; + _idRetrying = WorkingSet::INVALID_ID; + } - // If there's an obj there, there is no fetching to perform. - if (member->hasObj()) { - ++_specificStats.alreadyHasObj; - } - else { - // We need a valid loc to fetch from and this is the only state that has one. - verify(WorkingSetMember::LOC_AND_IDX == member->state); - verify(member->hasLoc()); - - try { - if (!_cursor) _cursor = _collection->getCursor(_txn); - - if (auto fetcher = _cursor->fetcherForId(member->loc)) { - // There's something to fetch. Hand the fetcher off to the WSM, and pass up - // a fetch request. - _idRetrying = id; - member->setFetcher(fetcher.release()); - *out = id; - _commonStats.needYield++; - return NEED_YIELD; - } - - // The doc is already in memory, so go ahead and grab it. Now we have a RecordId - // as well as an unowned object - if (!WorkingSetCommon::fetch(_txn, member, _cursor)) { - _ws->free(id); - _commonStats.needTime++; - return NEED_TIME; - } - } - catch (const WriteConflictException& wce) { + if (PlanStage::ADVANCED == status) { + WorkingSetMember* member = _ws->get(id); + + // If there's an obj there, there is no fetching to perform. + if (member->hasObj()) { + ++_specificStats.alreadyHasObj; + } else { + // We need a valid loc to fetch from and this is the only state that has one. + verify(WorkingSetMember::LOC_AND_IDX == member->state); + verify(member->hasLoc()); + + try { + if (!_cursor) + _cursor = _collection->getCursor(_txn); + + if (auto fetcher = _cursor->fetcherForId(member->loc)) { + // There's something to fetch. Hand the fetcher off to the WSM, and pass up + // a fetch request. _idRetrying = id; - *out = WorkingSet::INVALID_ID; + member->setFetcher(fetcher.release()); + *out = id; _commonStats.needYield++; return NEED_YIELD; } - } - return returnIfMatches(member, id, out); - } - else if (PlanStage::FAILURE == status || PlanStage::DEAD == status) { - *out = id; - // If a stage fails, it may create a status WSM to indicate why it - // failed, in which case 'id' is valid. If ID is invalid, we - // create our own error message. - if (WorkingSet::INVALID_ID == id) { - mongoutils::str::stream ss; - ss << "fetch stage failed to read in results from child"; - Status status(ErrorCodes::InternalError, ss); - *out = WorkingSetCommon::allocateStatusMember( _ws, status); + // The doc is already in memory, so go ahead and grab it. Now we have a RecordId + // as well as an unowned object + if (!WorkingSetCommon::fetch(_txn, member, _cursor)) { + _ws->free(id); + _commonStats.needTime++; + return NEED_TIME; + } + } catch (const WriteConflictException& wce) { + _idRetrying = id; + *out = WorkingSet::INVALID_ID; + _commonStats.needYield++; + return NEED_YIELD; } - return status; - } - else if (PlanStage::NEED_TIME == status) { - ++_commonStats.needTime; - } - else if (PlanStage::NEED_YIELD == status) { - ++_commonStats.needYield; - *out = id; } + return returnIfMatches(member, id, out); + } else if (PlanStage::FAILURE == status || PlanStage::DEAD == status) { + *out = id; + // If a stage fails, it may create a status WSM to indicate why it + // failed, in which case 'id' is valid. If ID is invalid, we + // create our own error message. + if (WorkingSet::INVALID_ID == id) { + mongoutils::str::stream ss; + ss << "fetch stage failed to read in results from child"; + Status status(ErrorCodes::InternalError, ss); + *out = WorkingSetCommon::allocateStatusMember(_ws, status); + } return status; + } else if (PlanStage::NEED_TIME == status) { + ++_commonStats.needTime; + } else if (PlanStage::NEED_YIELD == status) { + ++_commonStats.needYield; + *out = id; } - void FetchStage::saveState() { - _txn = NULL; - ++_commonStats.yields; - if (_cursor) _cursor->saveUnpositioned(); - _child->saveState(); - } - - void FetchStage::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - ++_commonStats.unyields; - if (_cursor) _cursor->restore(opCtx); - _child->restoreState(opCtx); - } - - void FetchStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { - ++_commonStats.invalidates; - - _child->invalidate(txn, dl, type); - - // It's possible that the loc getting invalidated is the one we're about to - // fetch. In this case we do a "forced fetch" and put the WSM in owned object state. - if (WorkingSet::INVALID_ID != _idRetrying) { - WorkingSetMember* member = _ws->get(_idRetrying); - if (member->hasLoc() && (member->loc == dl)) { - // Fetch it now and kill the diskloc. - WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); - } + return status; +} + +void FetchStage::saveState() { + _txn = NULL; + ++_commonStats.yields; + if (_cursor) + _cursor->saveUnpositioned(); + _child->saveState(); +} + +void FetchStage::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + ++_commonStats.unyields; + if (_cursor) + _cursor->restore(opCtx); + _child->restoreState(opCtx); +} + +void FetchStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { + ++_commonStats.invalidates; + + _child->invalidate(txn, dl, type); + + // It's possible that the loc getting invalidated is the one we're about to + // fetch. In this case we do a "forced fetch" and put the WSM in owned object state. + if (WorkingSet::INVALID_ID != _idRetrying) { + WorkingSetMember* member = _ws->get(_idRetrying); + if (member->hasLoc() && (member->loc == dl)) { + // Fetch it now and kill the diskloc. + WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); } } - - PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member, - WorkingSetID memberID, - WorkingSetID* out) { - // We consider "examining a document" to be every time that we pass a document through - // a filter by calling Filter::passes(...) below. Therefore, the 'docsExamined' metric - // is not always equal to the number of documents that were fetched from the collection. - // In particular, we can sometimes generate plans which have two fetch stages. The first - // one actually grabs the document from the collection, and the second passes the - // document through a second filter. - // - // One common example of this is geoNear. Suppose that a geoNear plan is searching an - // annulus to find 2dsphere-indexed documents near some point (x, y) on the globe. - // After fetching documents within geo hashes that intersect this annulus, the docs are - // fetched and filtered to make sure that they really do fall into this annulus. However, - // the user might also want to find only those documents for which accommodationType== - // "restaurant". The planner will add a second fetch stage to filter by this non-geo - // predicate. - ++_specificStats.docsExamined; - - if (Filter::passes(member, _filter)) { - *out = memberID; - - ++_commonStats.advanced; - return PlanStage::ADVANCED; - } - else { - _ws->free(memberID); - - ++_commonStats.needTime; - return PlanStage::NEED_TIME; - } +} + +PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member, + WorkingSetID memberID, + WorkingSetID* out) { + // We consider "examining a document" to be every time that we pass a document through + // a filter by calling Filter::passes(...) below. Therefore, the 'docsExamined' metric + // is not always equal to the number of documents that were fetched from the collection. + // In particular, we can sometimes generate plans which have two fetch stages. The first + // one actually grabs the document from the collection, and the second passes the + // document through a second filter. + // + // One common example of this is geoNear. Suppose that a geoNear plan is searching an + // annulus to find 2dsphere-indexed documents near some point (x, y) on the globe. + // After fetching documents within geo hashes that intersect this annulus, the docs are + // fetched and filtered to make sure that they really do fall into this annulus. However, + // the user might also want to find only those documents for which accommodationType== + // "restaurant". The planner will add a second fetch stage to filter by this non-geo + // predicate. + ++_specificStats.docsExamined; + + if (Filter::passes(member, _filter)) { + *out = memberID; + + ++_commonStats.advanced; + return PlanStage::ADVANCED; + } else { + _ws->free(memberID); + + ++_commonStats.needTime; + return PlanStage::NEED_TIME; } - - vector<PlanStage*> FetchStage::getChildren() const { - vector<PlanStage*> children; - children.push_back(_child.get()); - return children; +} + +vector<PlanStage*> FetchStage::getChildren() const { + vector<PlanStage*> children; + children.push_back(_child.get()); + return children; +} + +PlanStageStats* FetchStage::getStats() { + _commonStats.isEOF = isEOF(); + + // Add a BSON representation of the filter to the stats tree, if there is one. + if (NULL != _filter) { + BSONObjBuilder bob; + _filter->toBSON(&bob); + _commonStats.filter = bob.obj(); } - PlanStageStats* FetchStage::getStats() { - _commonStats.isEOF = isEOF(); - - // Add a BSON representation of the filter to the stats tree, if there is one. - if (NULL != _filter) { - BSONObjBuilder bob; - _filter->toBSON(&bob); - _commonStats.filter = bob.obj(); - } - - unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_FETCH)); - ret->specific.reset(new FetchStats(_specificStats)); - ret->children.push_back(_child->getStats()); - return ret.release(); - } + unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_FETCH)); + ret->specific.reset(new FetchStats(_specificStats)); + ret->children.push_back(_child->getStats()); + return ret.release(); +} - const CommonStats* FetchStage::getCommonStats() const { - return &_commonStats; - } +const CommonStats* FetchStage::getCommonStats() const { + return &_commonStats; +} - const SpecificStats* FetchStage::getSpecificStats() const { - return &_specificStats; - } +const SpecificStats* FetchStage::getSpecificStats() const { + return &_specificStats; +} } // namespace mongo |