summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/oplogstart.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec/oplogstart.cpp')
-rw-r--r--src/mongo/db/exec/oplogstart.cpp299
1 files changed, 152 insertions, 147 deletions
diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp
index 92de52db505..d05ddfc2f44 100644
--- a/src/mongo/db/exec/oplogstart.cpp
+++ b/src/mongo/db/exec/oplogstart.cpp
@@ -35,187 +35,192 @@
namespace mongo {
- using std::vector;
-
- const char* OplogStart::kStageType = "OPLOG_START";
-
- // Does not take ownership.
- OplogStart::OplogStart(OperationContext* txn,
- const Collection* collection,
- MatchExpression* filter,
- WorkingSet* ws)
- : _txn(txn),
- _needInit(true),
- _backwardsScanning(false),
- _extentHopping(false),
- _done(false),
- _collection(collection),
- _workingSet(ws),
- _filter(filter) { }
-
- OplogStart::~OplogStart() { }
-
- PlanStage::StageState OplogStart::work(WorkingSetID* out) {
- // We do our (heavy) init in a work(), where work is expected.
- if (_needInit) {
- CollectionScanParams params;
- params.collection = _collection;
- params.direction = CollectionScanParams::BACKWARD;
- _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL));
-
- _needInit = false;
- _backwardsScanning = true;
- _timer.reset();
- }
-
- // If we're still reading backwards, keep trying until timing out.
- if (_backwardsScanning) {
- verify(!_extentHopping);
- // Still have time to succeed with reading backwards.
- if (_timer.seconds() < _backwardsScanTime) {
- return workBackwardsScan(out);
- }
-
- try {
- // If this throws WCE, it leave us in a state were the next call to work will retry.
- switchToExtentHopping();
- }
- catch (const WriteConflictException& wce) {
- _subIterators.clear();
- *out = WorkingSet::INVALID_ID;
- return NEED_YIELD;
- }
- }
-
- // Don't find it in time? Swing from extent to extent like tarzan.com.
- verify(_extentHopping);
- return workExtentHopping(out);
+using std::vector;
+
+const char* OplogStart::kStageType = "OPLOG_START";
+
+// Does not take ownership.
+OplogStart::OplogStart(OperationContext* txn,
+ const Collection* collection,
+ MatchExpression* filter,
+ WorkingSet* ws)
+ : _txn(txn),
+ _needInit(true),
+ _backwardsScanning(false),
+ _extentHopping(false),
+ _done(false),
+ _collection(collection),
+ _workingSet(ws),
+ _filter(filter) {}
+
+OplogStart::~OplogStart() {}
+
+PlanStage::StageState OplogStart::work(WorkingSetID* out) {
+ // We do our (heavy) init in a work(), where work is expected.
+ if (_needInit) {
+ CollectionScanParams params;
+ params.collection = _collection;
+ params.direction = CollectionScanParams::BACKWARD;
+ _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL));
+
+ _needInit = false;
+ _backwardsScanning = true;
+ _timer.reset();
}
- PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) {
- if (_done || _subIterators.empty()) {
- return PlanStage::IS_EOF;
+ // If we're still reading backwards, keep trying until timing out.
+ if (_backwardsScanning) {
+ verify(!_extentHopping);
+ // Still have time to succeed with reading backwards.
+ if (_timer.seconds() < _backwardsScanTime) {
+ return workBackwardsScan(out);
}
- // we work from the back to the front since the back has the newest data.
try {
- // TODO: should we ever check fetcherForNext()?
- if (auto record = _subIterators.back()->next()) {
- BSONObj obj = record->data.releaseToBson();
- if (!_filter->matchesBSON(obj)) {
- _done = true;
- WorkingSetID id = _workingSet->allocate();
- WorkingSetMember* member = _workingSet->get(id);
- member->loc = record->id;
- member->obj = {_txn->recoveryUnit()->getSnapshotId(), std::move(obj)};
- member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
- *out = id;
- return PlanStage::ADVANCED;
- }
- }
- }
- catch (const WriteConflictException& wce) {
+ // If this throws WCE, it leave us in a state were the next call to work will retry.
+ switchToExtentHopping();
+ } catch (const WriteConflictException& wce) {
+ _subIterators.clear();
*out = WorkingSet::INVALID_ID;
- return PlanStage::NEED_YIELD;
+ return NEED_YIELD;
}
-
- _subIterators.pop_back();
- return PlanStage::NEED_TIME;
}
- void OplogStart::switchToExtentHopping() {
- // Set up our extent hopping state.
- _subIterators = _collection->getManyCursors(_txn);
+ // Don't find it in time? Swing from extent to extent like tarzan.com.
+ verify(_extentHopping);
+ return workExtentHopping(out);
+}
- // Transition from backwards scanning to extent hopping.
- _backwardsScanning = false;
- _extentHopping = true;
+PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) {
+ if (_done || _subIterators.empty()) {
+ return PlanStage::IS_EOF;
+ }
- // Toss the collection scan we were using.
- _cs.reset();
+ // we work from the back to the front since the back has the newest data.
+ try {
+ // TODO: should we ever check fetcherForNext()?
+ if (auto record = _subIterators.back()->next()) {
+ BSONObj obj = record->data.releaseToBson();
+ if (!_filter->matchesBSON(obj)) {
+ _done = true;
+ WorkingSetID id = _workingSet->allocate();
+ WorkingSetMember* member = _workingSet->get(id);
+ member->loc = record->id;
+ member->obj = {_txn->recoveryUnit()->getSnapshotId(), std::move(obj)};
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ *out = id;
+ return PlanStage::ADVANCED;
+ }
+ }
+ } catch (const WriteConflictException& wce) {
+ *out = WorkingSet::INVALID_ID;
+ return PlanStage::NEED_YIELD;
}
- PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) {
- PlanStage::StageState state = _cs->work(out);
+ _subIterators.pop_back();
+ return PlanStage::NEED_TIME;
+}
- // EOF. Just start from the beginning, which is where we've hit.
- if (PlanStage::IS_EOF == state) {
- _done = true;
- return state;
- }
+void OplogStart::switchToExtentHopping() {
+ // Set up our extent hopping state.
+ _subIterators = _collection->getManyCursors(_txn);
- if (PlanStage::ADVANCED != state) { return state; }
+ // Transition from backwards scanning to extent hopping.
+ _backwardsScanning = false;
+ _extentHopping = true;
- WorkingSetMember* member = _workingSet->get(*out);
- verify(member->hasObj());
- verify(member->hasLoc());
+ // Toss the collection scan we were using.
+ _cs.reset();
+}
- if (!_filter->matchesBSON(member->obj.value())) {
- _done = true;
- // RecordId is returned in *out.
- return PlanStage::ADVANCED;
- }
- else {
- _workingSet->free(*out);
- return PlanStage::NEED_TIME;
- }
+PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) {
+ PlanStage::StageState state = _cs->work(out);
+
+ // EOF. Just start from the beginning, which is where we've hit.
+ if (PlanStage::IS_EOF == state) {
+ _done = true;
+ return state;
}
- bool OplogStart::isEOF() { return _done; }
+ if (PlanStage::ADVANCED != state) {
+ return state;
+ }
- void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
- if (_needInit) { return; }
+ WorkingSetMember* member = _workingSet->get(*out);
+ verify(member->hasObj());
+ verify(member->hasLoc());
- if (INVALIDATION_DELETION != type) { return; }
+ if (!_filter->matchesBSON(member->obj.value())) {
+ _done = true;
+ // RecordId is returned in *out.
+ return PlanStage::ADVANCED;
+ } else {
+ _workingSet->free(*out);
+ return PlanStage::NEED_TIME;
+ }
+}
- if (_cs) {
- _cs->invalidate(txn, dl, type);
- }
+bool OplogStart::isEOF() {
+ return _done;
+}
- for (size_t i = 0; i < _subIterators.size(); i++) {
- _subIterators[i]->invalidate(dl);
- }
+void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ if (_needInit) {
+ return;
}
- void OplogStart::saveState() {
- _txn = NULL;
- if (_cs) {
- _cs->saveState();
- }
+ if (INVALIDATION_DELETION != type) {
+ return;
+ }
- for (size_t i = 0; i < _subIterators.size(); i++) {
- _subIterators[i]->savePositioned();
- }
+ if (_cs) {
+ _cs->invalidate(txn, dl, type);
}
- void OplogStart::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
- if (_cs) {
- _cs->restoreState(opCtx);
- }
+ for (size_t i = 0; i < _subIterators.size(); i++) {
+ _subIterators[i]->invalidate(dl);
+ }
+}
- for (size_t i = 0; i < _subIterators.size(); i++) {
- if (!_subIterators[i]->restore(opCtx)) {
- _subIterators.erase(_subIterators.begin() + i);
- // need to hit same i on next pass through loop
- i--;
- }
- }
+void OplogStart::saveState() {
+ _txn = NULL;
+ if (_cs) {
+ _cs->saveState();
}
- PlanStageStats* OplogStart::getStats() {
- std::unique_ptr<PlanStageStats> ret(new PlanStageStats(CommonStats(kStageType),
- STAGE_OPLOG_START));
- ret->specific.reset(new CollectionScanStats());
- return ret.release();
+ for (size_t i = 0; i < _subIterators.size(); i++) {
+ _subIterators[i]->savePositioned();
}
+}
- vector<PlanStage*> OplogStart::getChildren() const {
- vector<PlanStage*> empty;
- return empty;
+void OplogStart::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ if (_cs) {
+ _cs->restoreState(opCtx);
}
- int OplogStart::_backwardsScanTime = 5;
+ for (size_t i = 0; i < _subIterators.size(); i++) {
+ if (!_subIterators[i]->restore(opCtx)) {
+ _subIterators.erase(_subIterators.begin() + i);
+ // need to hit same i on next pass through loop
+ i--;
+ }
+ }
+}
+
+PlanStageStats* OplogStart::getStats() {
+ std::unique_ptr<PlanStageStats> ret(
+ new PlanStageStats(CommonStats(kStageType), STAGE_OPLOG_START));
+ ret->specific.reset(new CollectionScanStats());
+ return ret.release();
+}
+
+vector<PlanStage*> OplogStart::getChildren() const {
+ vector<PlanStage*> empty;
+ return empty;
+}
+
+int OplogStart::_backwardsScanTime = 5;
} // namespace mongo