summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/oplogstart.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/oplogstart.cpp')
-rw-r--r--src/mongo/db/exec/oplogstart.cpp257
1 files changed, 132 insertions, 125 deletions
diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp
index 56668b4b4fd..30923a758be 100644
--- a/src/mongo/db/exec/oplogstart.cpp
+++ b/src/mongo/db/exec/oplogstart.cpp
@@ -34,161 +34,168 @@
namespace mongo {
- using std::vector;
-
- // Does not take ownership.
- OplogStart::OplogStart(OperationContext* txn,
- const Collection* collection,
- MatchExpression* filter,
- WorkingSet* ws)
- : _txn(txn),
- _needInit(true),
- _backwardsScanning(false),
- _extentHopping(false),
- _done(false),
- _collection(collection),
- _workingSet(ws),
- _filter(filter) { }
-
- OplogStart::~OplogStart() { }
-
- PlanStage::StageState OplogStart::work(WorkingSetID* out) {
- // We do our (heavy) init in a work(), where work is expected.
- if (_needInit) {
- CollectionScanParams params;
- params.collection = _collection;
- params.direction = CollectionScanParams::BACKWARD;
- _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL));
- _needInit = false;
- _backwardsScanning = true;
- _timer.reset();
- }
+using std::vector;
+
+// Does not take ownership.
+OplogStart::OplogStart(OperationContext* txn,
+ const Collection* collection,
+ MatchExpression* filter,
+ WorkingSet* ws)
+ : _txn(txn),
+ _needInit(true),
+ _backwardsScanning(false),
+ _extentHopping(false),
+ _done(false),
+ _collection(collection),
+ _workingSet(ws),
+ _filter(filter) {}
+
+OplogStart::~OplogStart() {}
+
+PlanStage::StageState OplogStart::work(WorkingSetID* out) {
+ // We do our (heavy) init in a work(), where work is expected.
+ if (_needInit) {
+ CollectionScanParams params;
+ params.collection = _collection;
+ params.direction = CollectionScanParams::BACKWARD;
+ _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL));
+ _needInit = false;
+ _backwardsScanning = true;
+ _timer.reset();
+ }
- // If we're still reading backwards, keep trying until timing out.
- if (_backwardsScanning) {
- verify(!_extentHopping);
- // Still have time to succeed with reading backwards.
- if (_timer.seconds() < _backwardsScanTime) {
- return workBackwardsScan(out);
- }
- switchToExtentHopping();
+ // If we're still reading backwards, keep trying until timing out.
+ if (_backwardsScanning) {
+ verify(!_extentHopping);
+ // Still have time to succeed with reading backwards.
+ if (_timer.seconds() < _backwardsScanTime) {
+ return workBackwardsScan(out);
}
-
- // Don't find it in time? Swing from extent to extent like tarzan.com.
- verify(_extentHopping);
- return workExtentHopping(out);
+ switchToExtentHopping();
}
- PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) {
- if (_done || _subIterators.empty()) {
- return PlanStage::IS_EOF;
- }
+ // Don't find it in time? Swing from extent to extent like tarzan.com.
+ verify(_extentHopping);
+ return workExtentHopping(out);
+}
- // we work from the back to the front since the back has the newest data.
- const RecordId loc = _subIterators.back()->getNext();
- _subIterators.popAndDeleteBack();
-
- // TODO: should we ever try and return NEED_FETCH here?
- if (!loc.isNull() && !_filter->matchesBSON(_collection->docFor(_txn, loc).value())) {
- _done = true;
- WorkingSetID id = _workingSet->allocate();
- WorkingSetMember* member = _workingSet->get(id);
- member->loc = loc;
- member->obj = _collection->docFor(_txn, member->loc);
- member->state = WorkingSetMember::LOC_AND_OBJ;
- *out = id;
- return PlanStage::ADVANCED;
- }
+PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) {
+ if (_done || _subIterators.empty()) {
+ return PlanStage::IS_EOF;
+ }
- return PlanStage::NEED_TIME;
+ // we work from the back to the front since the back has the newest data.
+ const RecordId loc = _subIterators.back()->getNext();
+ _subIterators.popAndDeleteBack();
+
+ // TODO: should we ever try and return NEED_FETCH here?
+ if (!loc.isNull() && !_filter->matchesBSON(_collection->docFor(_txn, loc).value())) {
+ _done = true;
+ WorkingSetID id = _workingSet->allocate();
+ WorkingSetMember* member = _workingSet->get(id);
+ member->loc = loc;
+ member->obj = _collection->docFor(_txn, member->loc);
+ member->state = WorkingSetMember::LOC_AND_OBJ;
+ *out = id;
+ return PlanStage::ADVANCED;
}
- void OplogStart::switchToExtentHopping() {
- // Transition from backwards scanning to extent hopping.
- _backwardsScanning = false;
- _extentHopping = true;
+ return PlanStage::NEED_TIME;
+}
- // Toss the collection scan we were using.
- _cs.reset();
+void OplogStart::switchToExtentHopping() {
+ // Transition from backwards scanning to extent hopping.
+ _backwardsScanning = false;
+ _extentHopping = true;
- // Set up our extent hopping state.
- _subIterators = _collection->getManyIterators(_txn);
- }
+ // Toss the collection scan we were using.
+ _cs.reset();
- PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) {
- PlanStage::StageState state = _cs->work(out);
+ // Set up our extent hopping state.
+ _subIterators = _collection->getManyIterators(_txn);
+}
- // EOF. Just start from the beginning, which is where we've hit.
- if (PlanStage::IS_EOF == state) {
- _done = true;
- return state;
- }
+PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) {
+ PlanStage::StageState state = _cs->work(out);
- if (PlanStage::ADVANCED != state) { return state; }
+ // EOF. Just start from the beginning, which is where we've hit.
+ if (PlanStage::IS_EOF == state) {
+ _done = true;
+ return state;
+ }
- WorkingSetMember* member = _workingSet->get(*out);
- verify(member->hasObj());
- verify(member->hasLoc());
+ if (PlanStage::ADVANCED != state) {
+ return state;
+ }
- if (!_filter->matchesBSON(member->obj.value())) {
- _done = true;
- // RecordId is returned in *out.
- return PlanStage::ADVANCED;
- }
- else {
- _workingSet->free(*out);
- return PlanStage::NEED_TIME;
- }
+ WorkingSetMember* member = _workingSet->get(*out);
+ verify(member->hasObj());
+ verify(member->hasLoc());
+
+ if (!_filter->matchesBSON(member->obj.value())) {
+ _done = true;
+ // RecordId is returned in *out.
+ return PlanStage::ADVANCED;
+ } else {
+ _workingSet->free(*out);
+ return PlanStage::NEED_TIME;
}
+}
- bool OplogStart::isEOF() { return _done; }
+bool OplogStart::isEOF() {
+ return _done;
+}
- void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
- if (_needInit) { return; }
+void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ if (_needInit) {
+ return;
+ }
- if (INVALIDATION_DELETION != type) { return; }
+ if (INVALIDATION_DELETION != type) {
+ return;
+ }
- if (_cs) {
- _cs->invalidate(txn, dl, type);
- }
+ if (_cs) {
+ _cs->invalidate(txn, dl, type);
+ }
- for (size_t i = 0; i < _subIterators.size(); i++) {
- _subIterators[i]->invalidate(dl);
- }
+ for (size_t i = 0; i < _subIterators.size(); i++) {
+ _subIterators[i]->invalidate(dl);
}
+}
- void OplogStart::saveState() {
- _txn = NULL;
- if (_cs) {
- _cs->saveState();
- }
+void OplogStart::saveState() {
+ _txn = NULL;
+ if (_cs) {
+ _cs->saveState();
+ }
- for (size_t i = 0; i < _subIterators.size(); i++) {
- _subIterators[i]->saveState();
- }
+ for (size_t i = 0; i < _subIterators.size(); i++) {
+ _subIterators[i]->saveState();
}
+}
- void OplogStart::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
- if (_cs) {
- _cs->restoreState(opCtx);
- }
+void OplogStart::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ if (_cs) {
+ _cs->restoreState(opCtx);
+ }
- for (size_t i = 0; i < _subIterators.size(); i++) {
- if (!_subIterators[i]->restoreState(opCtx)) {
- _subIterators.erase(_subIterators.begin() + i);
- // need to hit same i on next pass through loop
- i--;
- }
+ for (size_t i = 0; i < _subIterators.size(); i++) {
+ if (!_subIterators[i]->restoreState(opCtx)) {
+ _subIterators.erase(_subIterators.begin() + i);
+ // need to hit same i on next pass through loop
+ i--;
}
}
+}
- vector<PlanStage*> OplogStart::getChildren() const {
- vector<PlanStage*> empty;
- return empty;
- }
+vector<PlanStage*> OplogStart::getChildren() const {
+ vector<PlanStage*> empty;
+ return empty;
+}
- int OplogStart::_backwardsScanTime = 5;
+int OplogStart::_backwardsScanTime = 5;
} // namespace mongo