diff options
Diffstat (limited to 'src/mongo/db/exec/collection_scan.cpp')
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 366 |
1 files changed, 181 insertions, 185 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 1a0c16c6b55..f0e09f31629 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -42,225 +42,221 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/db/client.h" // XXX-ERH +#include "mongo/db/client.h" // XXX-ERH namespace mongo { - using std::unique_ptr; - using std::vector; - - // static - const char* CollectionScan::kStageType = "COLLSCAN"; - - CollectionScan::CollectionScan(OperationContext* txn, - const CollectionScanParams& params, - WorkingSet* workingSet, - const MatchExpression* filter) - : _txn(txn), - _workingSet(workingSet), - _filter(filter), - _params(params), - _isDead(false), - _wsidForFetch(_workingSet->allocate()), - _commonStats(kStageType) { - // Explain reports the direction of the collection scan. - _specificStats.direction = params.direction; - - // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used - // for anything other than passing up NEED_YIELD. We use the loc and owned obj state, but - // the loc isn't really pointing at any obj. The obj field of the WSM should never be used. - WorkingSetMember* member = _workingSet->get(_wsidForFetch); - member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; +using std::unique_ptr; +using std::vector; + +// static +const char* CollectionScan::kStageType = "COLLSCAN"; + +CollectionScan::CollectionScan(OperationContext* txn, + const CollectionScanParams& params, + WorkingSet* workingSet, + const MatchExpression* filter) + : _txn(txn), + _workingSet(workingSet), + _filter(filter), + _params(params), + _isDead(false), + _wsidForFetch(_workingSet->allocate()), + _commonStats(kStageType) { + // Explain reports the direction of the collection scan. + _specificStats.direction = params.direction; + + // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used + // for anything other than passing up NEED_YIELD. We use the loc and owned obj state, but + // the loc isn't really pointing at any obj. The obj field of the WSM should never be used. + WorkingSetMember* member = _workingSet->get(_wsidForFetch); + member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; +} + +PlanStage::StageState CollectionScan::work(WorkingSetID* out) { + ++_commonStats.works; + + // Adds the amount of time taken by work() to executionTimeMillis. + ScopedTimer timer(&_commonStats.executionTimeMillis); + + if (_isDead) { + Status status(ErrorCodes::InternalError, "CollectionScan died"); + *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); + return PlanStage::DEAD; } - PlanStage::StageState CollectionScan::work(WorkingSetID* out) { - ++_commonStats.works; - - // Adds the amount of time taken by work() to executionTimeMillis. - ScopedTimer timer(&_commonStats.executionTimeMillis); - - if (_isDead) { - Status status(ErrorCodes::InternalError, "CollectionScan died"); - *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); - return PlanStage::DEAD; - } + if ((0 != _params.maxScan) && (_specificStats.docsTested >= _params.maxScan)) { + _commonStats.isEOF = true; + } - if ((0 != _params.maxScan) && (_specificStats.docsTested >= _params.maxScan)) { - _commonStats.isEOF = true; - } + if (_commonStats.isEOF) { + return PlanStage::IS_EOF; + } - if (_commonStats.isEOF) { return PlanStage::IS_EOF; } - - boost::optional<Record> record; - const bool needToMakeCursor = !_cursor; - try { - if (needToMakeCursor) { - const bool forward = _params.direction == CollectionScanParams::FORWARD; - _cursor = _params.collection->getCursor(_txn, forward); - - if (!_lastSeenId.isNull()) { - invariant(_params.tailable); - // Seek to where we were last time. If it no longer exists, mark us as dead - // since we want to signal an error rather than silently dropping data from the - // stream. This is related to the _lastSeenId handling in invalidate. Note that - // we want to return the record *after* this one since we have already returned - // this one. This is only possible in the tailing case because that is the only - // time we'd need to create a cursor after already getting a record out of it. - if (!_cursor->seekExact(_lastSeenId)) { - _isDead = true; - Status status(ErrorCodes::InternalError, - "CollectionScan died: Unexpected RecordId"); - *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); - return PlanStage::DEAD; - } + boost::optional<Record> record; + const bool needToMakeCursor = !_cursor; + try { + if (needToMakeCursor) { + const bool forward = _params.direction == CollectionScanParams::FORWARD; + _cursor = _params.collection->getCursor(_txn, forward); + + if (!_lastSeenId.isNull()) { + invariant(_params.tailable); + // Seek to where we were last time. If it no longer exists, mark us as dead + // since we want to signal an error rather than silently dropping data from the + // stream. This is related to the _lastSeenId handling in invalidate. Note that + // we want to return the record *after* this one since we have already returned + // this one. This is only possible in the tailing case because that is the only + // time we'd need to create a cursor after already getting a record out of it. + if (!_cursor->seekExact(_lastSeenId)) { + _isDead = true; + Status status(ErrorCodes::InternalError, + "CollectionScan died: Unexpected RecordId"); + *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); + return PlanStage::DEAD; } - - _commonStats.needTime++; - return PlanStage::NEED_TIME; - } - - if (_lastSeenId.isNull() && !_params.start.isNull()) { - record = _cursor->seekExact(_params.start); } - else { - // See if the record we're about to access is in memory. If not, pass a fetch - // request up. - if (auto fetcher = _cursor->fetcherForNext()) { - // Pass the RecordFetcher up. - WorkingSetMember* member = _workingSet->get(_wsidForFetch); - member->setFetcher(fetcher.release()); - *out = _wsidForFetch; - _commonStats.needYield++; - return PlanStage::NEED_YIELD; - } - record = _cursor->next(); - } - } - catch (const WriteConflictException& wce) { - // Leave us in a state to try again next time. - if (needToMakeCursor) - _cursor.reset(); - *out = WorkingSet::INVALID_ID; - return PlanStage::NEED_YIELD; + _commonStats.needTime++; + return PlanStage::NEED_TIME; } - if (!record) { - // We just hit EOF. If we are tailable and have already returned data, leave us in a - // state to pick up where we left off on the next call to work(). Otherwise EOF is - // permanent. - if (_params.tailable && !_lastSeenId.isNull()) { - _cursor.reset(); - } - else { - _commonStats.isEOF = true; + if (_lastSeenId.isNull() && !_params.start.isNull()) { + record = _cursor->seekExact(_params.start); + } else { + // See if the record we're about to access is in memory. If not, pass a fetch + // request up. + if (auto fetcher = _cursor->fetcherForNext()) { + // Pass the RecordFetcher up. + WorkingSetMember* member = _workingSet->get(_wsidForFetch); + member->setFetcher(fetcher.release()); + *out = _wsidForFetch; + _commonStats.needYield++; + return PlanStage::NEED_YIELD; } - - return PlanStage::IS_EOF; - } - - _lastSeenId = record->id; - - WorkingSetID id = _workingSet->allocate(); - WorkingSetMember* member = _workingSet->get(id); - member->loc = record->id; - member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - return returnIfMatches(member, id, out); + record = _cursor->next(); + } + } catch (const WriteConflictException& wce) { + // Leave us in a state to try again next time. + if (needToMakeCursor) + _cursor.reset(); + *out = WorkingSet::INVALID_ID; + return PlanStage::NEED_YIELD; } - PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, - WorkingSetID memberID, - WorkingSetID* out) { - ++_specificStats.docsTested; - - if (Filter::passes(member, _filter)) { - *out = memberID; - ++_commonStats.advanced; - return PlanStage::ADVANCED; - } - else { - _workingSet->free(memberID); - ++_commonStats.needTime; - return PlanStage::NEED_TIME; + if (!record) { + // We just hit EOF. If we are tailable and have already returned data, leave us in a + // state to pick up where we left off on the next call to work(). Otherwise EOF is + // permanent. + if (_params.tailable && !_lastSeenId.isNull()) { + _cursor.reset(); + } else { + _commonStats.isEOF = true; } + + return PlanStage::IS_EOF; } - bool CollectionScan::isEOF() { - return _commonStats.isEOF || _isDead; + _lastSeenId = record->id; + + WorkingSetID id = _workingSet->allocate(); + WorkingSetMember* member = _workingSet->get(id); + member->loc = record->id; + member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + + return returnIfMatches(member, id, out); +} + +PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, + WorkingSetID memberID, + WorkingSetID* out) { + ++_specificStats.docsTested; + + if (Filter::passes(member, _filter)) { + *out = memberID; + ++_commonStats.advanced; + return PlanStage::ADVANCED; + } else { + _workingSet->free(memberID); + ++_commonStats.needTime; + return PlanStage::NEED_TIME; } +} - void CollectionScan::invalidate(OperationContext* txn, - const RecordId& id, - InvalidationType type) { - ++_commonStats.invalidates; +bool CollectionScan::isEOF() { + return _commonStats.isEOF || _isDead; +} - // We don't care about mutations since we apply any filters to the result when we (possibly) - // return it. - if (INVALIDATION_DELETION != type) { - return; - } +void CollectionScan::invalidate(OperationContext* txn, const RecordId& id, InvalidationType type) { + ++_commonStats.invalidates; - // If we're here, 'id' is being deleted. + // We don't care about mutations since we apply any filters to the result when we (possibly) + // return it. + if (INVALIDATION_DELETION != type) { + return; + } - // Deletions can harm the underlying RecordCursor so we must pass them down. - if (_cursor) { - _cursor->invalidate(id); - } + // If we're here, 'id' is being deleted. - if (_params.tailable && id == _lastSeenId) { - // This means that deletes have caught up to the reader. We want to error in this case - // so readers don't miss potentially important data. - _isDead = true; - } + // Deletions can harm the underlying RecordCursor so we must pass them down. + if (_cursor) { + _cursor->invalidate(id); } - void CollectionScan::saveState() { - _txn = NULL; - ++_commonStats.yields; - if (_cursor) { - _cursor->savePositioned(); - } + if (_params.tailable && id == _lastSeenId) { + // This means that deletes have caught up to the reader. We want to error in this case + // so readers don't miss potentially important data. + _isDead = true; } +} - void CollectionScan::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - ++_commonStats.unyields; - if (_cursor) { - if (!_cursor->restore(opCtx)) { - warning() << "Collection dropped or state deleted during yield of CollectionScan: " - << opCtx->getNS(); - _isDead = true; - } +void CollectionScan::saveState() { + _txn = NULL; + ++_commonStats.yields; + if (_cursor) { + _cursor->savePositioned(); + } +} + +void CollectionScan::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + ++_commonStats.unyields; + if (_cursor) { + if (!_cursor->restore(opCtx)) { + warning() << "Collection dropped or state deleted during yield of CollectionScan: " + << opCtx->getNS(); + _isDead = true; } } - - vector<PlanStage*> CollectionScan::getChildren() const { - vector<PlanStage*> empty; - return empty; +} + +vector<PlanStage*> CollectionScan::getChildren() const { + vector<PlanStage*> empty; + return empty; +} + +PlanStageStats* CollectionScan::getStats() { + // Add a BSON representation of the filter to the stats tree, if there is one. + if (NULL != _filter) { + BSONObjBuilder bob; + _filter->toBSON(&bob); + _commonStats.filter = bob.obj(); } - PlanStageStats* CollectionScan::getStats() { - // Add a BSON representation of the filter to the stats tree, if there is one. - if (NULL != _filter) { - BSONObjBuilder bob; - _filter->toBSON(&bob); - _commonStats.filter = bob.obj(); - } + unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_COLLSCAN)); + ret->specific.reset(new CollectionScanStats(_specificStats)); + return ret.release(); +} - unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_COLLSCAN)); - ret->specific.reset(new CollectionScanStats(_specificStats)); - return ret.release(); - } - - const CommonStats* CollectionScan::getCommonStats() const { - return &_commonStats; - } +const CommonStats* CollectionScan::getCommonStats() const { + return &_commonStats; +} - const SpecificStats* CollectionScan::getSpecificStats() const { - return &_specificStats; - } +const SpecificStats* CollectionScan::getSpecificStats() const { + return &_specificStats; +} } // namespace mongo |