summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/collection_scan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/collection_scan.cpp')
-rw-r--r--src/mongo/db/exec/collection_scan.cpp366
1 files changed, 181 insertions, 185 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 1a0c16c6b55..f0e09f31629 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -42,225 +42,221 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
-#include "mongo/db/client.h" // XXX-ERH
+#include "mongo/db/client.h" // XXX-ERH
namespace mongo {
- using std::unique_ptr;
- using std::vector;
-
- // static
- const char* CollectionScan::kStageType = "COLLSCAN";
-
- CollectionScan::CollectionScan(OperationContext* txn,
- const CollectionScanParams& params,
- WorkingSet* workingSet,
- const MatchExpression* filter)
- : _txn(txn),
- _workingSet(workingSet),
- _filter(filter),
- _params(params),
- _isDead(false),
- _wsidForFetch(_workingSet->allocate()),
- _commonStats(kStageType) {
- // Explain reports the direction of the collection scan.
- _specificStats.direction = params.direction;
-
- // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used
- // for anything other than passing up NEED_YIELD. We use the loc and owned obj state, but
- // the loc isn't really pointing at any obj. The obj field of the WSM should never be used.
- WorkingSetMember* member = _workingSet->get(_wsidForFetch);
- member->state = WorkingSetMember::LOC_AND_OWNED_OBJ;
+using std::unique_ptr;
+using std::vector;
+
+// static
+const char* CollectionScan::kStageType = "COLLSCAN";
+
+CollectionScan::CollectionScan(OperationContext* txn,
+ const CollectionScanParams& params,
+ WorkingSet* workingSet,
+ const MatchExpression* filter)
+ : _txn(txn),
+ _workingSet(workingSet),
+ _filter(filter),
+ _params(params),
+ _isDead(false),
+ _wsidForFetch(_workingSet->allocate()),
+ _commonStats(kStageType) {
+ // Explain reports the direction of the collection scan.
+ _specificStats.direction = params.direction;
+
+ // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used
+ // for anything other than passing up NEED_YIELD. We use the loc and owned obj state, but
+ // the loc isn't really pointing at any obj. The obj field of the WSM should never be used.
+ WorkingSetMember* member = _workingSet->get(_wsidForFetch);
+ member->state = WorkingSetMember::LOC_AND_OWNED_OBJ;
+}
+
+PlanStage::StageState CollectionScan::work(WorkingSetID* out) {
+ ++_commonStats.works;
+
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ if (_isDead) {
+ Status status(ErrorCodes::InternalError, "CollectionScan died");
+ *out = WorkingSetCommon::allocateStatusMember(_workingSet, status);
+ return PlanStage::DEAD;
}
- PlanStage::StageState CollectionScan::work(WorkingSetID* out) {
- ++_commonStats.works;
-
- // Adds the amount of time taken by work() to executionTimeMillis.
- ScopedTimer timer(&_commonStats.executionTimeMillis);
-
- if (_isDead) {
- Status status(ErrorCodes::InternalError, "CollectionScan died");
- *out = WorkingSetCommon::allocateStatusMember(_workingSet, status);
- return PlanStage::DEAD;
- }
+ if ((0 != _params.maxScan) && (_specificStats.docsTested >= _params.maxScan)) {
+ _commonStats.isEOF = true;
+ }
- if ((0 != _params.maxScan) && (_specificStats.docsTested >= _params.maxScan)) {
- _commonStats.isEOF = true;
- }
+ if (_commonStats.isEOF) {
+ return PlanStage::IS_EOF;
+ }
- if (_commonStats.isEOF) { return PlanStage::IS_EOF; }
-
- boost::optional<Record> record;
- const bool needToMakeCursor = !_cursor;
- try {
- if (needToMakeCursor) {
- const bool forward = _params.direction == CollectionScanParams::FORWARD;
- _cursor = _params.collection->getCursor(_txn, forward);
-
- if (!_lastSeenId.isNull()) {
- invariant(_params.tailable);
- // Seek to where we were last time. If it no longer exists, mark us as dead
- // since we want to signal an error rather than silently dropping data from the
- // stream. This is related to the _lastSeenId handling in invalidate. Note that
- // we want to return the record *after* this one since we have already returned
- // this one. This is only possible in the tailing case because that is the only
- // time we'd need to create a cursor after already getting a record out of it.
- if (!_cursor->seekExact(_lastSeenId)) {
- _isDead = true;
- Status status(ErrorCodes::InternalError,
- "CollectionScan died: Unexpected RecordId");
- *out = WorkingSetCommon::allocateStatusMember(_workingSet, status);
- return PlanStage::DEAD;
- }
+ boost::optional<Record> record;
+ const bool needToMakeCursor = !_cursor;
+ try {
+ if (needToMakeCursor) {
+ const bool forward = _params.direction == CollectionScanParams::FORWARD;
+ _cursor = _params.collection->getCursor(_txn, forward);
+
+ if (!_lastSeenId.isNull()) {
+ invariant(_params.tailable);
+ // Seek to where we were last time. If it no longer exists, mark us as dead
+ // since we want to signal an error rather than silently dropping data from the
+ // stream. This is related to the _lastSeenId handling in invalidate. Note that
+ // we want to return the record *after* this one since we have already returned
+ // this one. This is only possible in the tailing case because that is the only
+ // time we'd need to create a cursor after already getting a record out of it.
+ if (!_cursor->seekExact(_lastSeenId)) {
+ _isDead = true;
+ Status status(ErrorCodes::InternalError,
+ "CollectionScan died: Unexpected RecordId");
+ *out = WorkingSetCommon::allocateStatusMember(_workingSet, status);
+ return PlanStage::DEAD;
}
-
- _commonStats.needTime++;
- return PlanStage::NEED_TIME;
- }
-
- if (_lastSeenId.isNull() && !_params.start.isNull()) {
- record = _cursor->seekExact(_params.start);
}
- else {
- // See if the record we're about to access is in memory. If not, pass a fetch
- // request up.
- if (auto fetcher = _cursor->fetcherForNext()) {
- // Pass the RecordFetcher up.
- WorkingSetMember* member = _workingSet->get(_wsidForFetch);
- member->setFetcher(fetcher.release());
- *out = _wsidForFetch;
- _commonStats.needYield++;
- return PlanStage::NEED_YIELD;
- }
- record = _cursor->next();
- }
- }
- catch (const WriteConflictException& wce) {
- // Leave us in a state to try again next time.
- if (needToMakeCursor)
- _cursor.reset();
- *out = WorkingSet::INVALID_ID;
- return PlanStage::NEED_YIELD;
+ _commonStats.needTime++;
+ return PlanStage::NEED_TIME;
}
- if (!record) {
- // We just hit EOF. If we are tailable and have already returned data, leave us in a
- // state to pick up where we left off on the next call to work(). Otherwise EOF is
- // permanent.
- if (_params.tailable && !_lastSeenId.isNull()) {
- _cursor.reset();
- }
- else {
- _commonStats.isEOF = true;
+ if (_lastSeenId.isNull() && !_params.start.isNull()) {
+ record = _cursor->seekExact(_params.start);
+ } else {
+ // See if the record we're about to access is in memory. If not, pass a fetch
+ // request up.
+ if (auto fetcher = _cursor->fetcherForNext()) {
+ // Pass the RecordFetcher up.
+ WorkingSetMember* member = _workingSet->get(_wsidForFetch);
+ member->setFetcher(fetcher.release());
+ *out = _wsidForFetch;
+ _commonStats.needYield++;
+ return PlanStage::NEED_YIELD;
}
-
- return PlanStage::IS_EOF;
- }
-
- _lastSeenId = record->id;
-
- WorkingSetID id = _workingSet->allocate();
- WorkingSetMember* member = _workingSet->get(id);
- member->loc = record->id;
- member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()};
- member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
- return returnIfMatches(member, id, out);
+ record = _cursor->next();
+ }
+ } catch (const WriteConflictException& wce) {
+ // Leave us in a state to try again next time.
+ if (needToMakeCursor)
+ _cursor.reset();
+ *out = WorkingSet::INVALID_ID;
+ return PlanStage::NEED_YIELD;
}
- PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
- WorkingSetID memberID,
- WorkingSetID* out) {
- ++_specificStats.docsTested;
-
- if (Filter::passes(member, _filter)) {
- *out = memberID;
- ++_commonStats.advanced;
- return PlanStage::ADVANCED;
- }
- else {
- _workingSet->free(memberID);
- ++_commonStats.needTime;
- return PlanStage::NEED_TIME;
+ if (!record) {
+ // We just hit EOF. If we are tailable and have already returned data, leave us in a
+ // state to pick up where we left off on the next call to work(). Otherwise EOF is
+ // permanent.
+ if (_params.tailable && !_lastSeenId.isNull()) {
+ _cursor.reset();
+ } else {
+ _commonStats.isEOF = true;
}
+
+ return PlanStage::IS_EOF;
}
- bool CollectionScan::isEOF() {
- return _commonStats.isEOF || _isDead;
+ _lastSeenId = record->id;
+
+ WorkingSetID id = _workingSet->allocate();
+ WorkingSetMember* member = _workingSet->get(id);
+ member->loc = record->id;
+ member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()};
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+
+ return returnIfMatches(member, id, out);
+}
+
+PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
+ WorkingSetID memberID,
+ WorkingSetID* out) {
+ ++_specificStats.docsTested;
+
+ if (Filter::passes(member, _filter)) {
+ *out = memberID;
+ ++_commonStats.advanced;
+ return PlanStage::ADVANCED;
+ } else {
+ _workingSet->free(memberID);
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
}
+}
- void CollectionScan::invalidate(OperationContext* txn,
- const RecordId& id,
- InvalidationType type) {
- ++_commonStats.invalidates;
+bool CollectionScan::isEOF() {
+ return _commonStats.isEOF || _isDead;
+}
- // We don't care about mutations since we apply any filters to the result when we (possibly)
- // return it.
- if (INVALIDATION_DELETION != type) {
- return;
- }
+void CollectionScan::invalidate(OperationContext* txn, const RecordId& id, InvalidationType type) {
+ ++_commonStats.invalidates;
- // If we're here, 'id' is being deleted.
+ // We don't care about mutations since we apply any filters to the result when we (possibly)
+ // return it.
+ if (INVALIDATION_DELETION != type) {
+ return;
+ }
- // Deletions can harm the underlying RecordCursor so we must pass them down.
- if (_cursor) {
- _cursor->invalidate(id);
- }
+ // If we're here, 'id' is being deleted.
- if (_params.tailable && id == _lastSeenId) {
- // This means that deletes have caught up to the reader. We want to error in this case
- // so readers don't miss potentially important data.
- _isDead = true;
- }
+ // Deletions can harm the underlying RecordCursor so we must pass them down.
+ if (_cursor) {
+ _cursor->invalidate(id);
}
- void CollectionScan::saveState() {
- _txn = NULL;
- ++_commonStats.yields;
- if (_cursor) {
- _cursor->savePositioned();
- }
+ if (_params.tailable && id == _lastSeenId) {
+ // This means that deletes have caught up to the reader. We want to error in this case
+ // so readers don't miss potentially important data.
+ _isDead = true;
}
+}
- void CollectionScan::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
- ++_commonStats.unyields;
- if (_cursor) {
- if (!_cursor->restore(opCtx)) {
- warning() << "Collection dropped or state deleted during yield of CollectionScan: "
- << opCtx->getNS();
- _isDead = true;
- }
+void CollectionScan::saveState() {
+ _txn = NULL;
+ ++_commonStats.yields;
+ if (_cursor) {
+ _cursor->savePositioned();
+ }
+}
+
+void CollectionScan::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ ++_commonStats.unyields;
+ if (_cursor) {
+ if (!_cursor->restore(opCtx)) {
+ warning() << "Collection dropped or state deleted during yield of CollectionScan: "
+ << opCtx->getNS();
+ _isDead = true;
}
}
-
- vector<PlanStage*> CollectionScan::getChildren() const {
- vector<PlanStage*> empty;
- return empty;
+}
+
+vector<PlanStage*> CollectionScan::getChildren() const {
+ vector<PlanStage*> empty;
+ return empty;
+}
+
+PlanStageStats* CollectionScan::getStats() {
+ // Add a BSON representation of the filter to the stats tree, if there is one.
+ if (NULL != _filter) {
+ BSONObjBuilder bob;
+ _filter->toBSON(&bob);
+ _commonStats.filter = bob.obj();
}
- PlanStageStats* CollectionScan::getStats() {
- // Add a BSON representation of the filter to the stats tree, if there is one.
- if (NULL != _filter) {
- BSONObjBuilder bob;
- _filter->toBSON(&bob);
- _commonStats.filter = bob.obj();
- }
+ unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_COLLSCAN));
+ ret->specific.reset(new CollectionScanStats(_specificStats));
+ return ret.release();
+}
- unique_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_COLLSCAN));
- ret->specific.reset(new CollectionScanStats(_specificStats));
- return ret.release();
- }
-
- const CommonStats* CollectionScan::getCommonStats() const {
- return &_commonStats;
- }
+const CommonStats* CollectionScan::getCommonStats() const {
+ return &_commonStats;
+}
- const SpecificStats* CollectionScan::getSpecificStats() const {
- return &_specificStats;
- }
+const SpecificStats* CollectionScan::getSpecificStats() const {
+ return &_specificStats;
+}
} // namespace mongo