diff options
Diffstat (limited to 'src/mongo/db/exec/oplogstart.cpp')
-rw-r--r-- | src/mongo/db/exec/oplogstart.cpp | 299 |
1 files changed, 152 insertions, 147 deletions
diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp index 92de52db505..d05ddfc2f44 100644 --- a/src/mongo/db/exec/oplogstart.cpp +++ b/src/mongo/db/exec/oplogstart.cpp @@ -35,187 +35,192 @@ namespace mongo { - using std::vector; - - const char* OplogStart::kStageType = "OPLOG_START"; - - // Does not take ownership. - OplogStart::OplogStart(OperationContext* txn, - const Collection* collection, - MatchExpression* filter, - WorkingSet* ws) - : _txn(txn), - _needInit(true), - _backwardsScanning(false), - _extentHopping(false), - _done(false), - _collection(collection), - _workingSet(ws), - _filter(filter) { } - - OplogStart::~OplogStart() { } - - PlanStage::StageState OplogStart::work(WorkingSetID* out) { - // We do our (heavy) init in a work(), where work is expected. - if (_needInit) { - CollectionScanParams params; - params.collection = _collection; - params.direction = CollectionScanParams::BACKWARD; - _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL)); - - _needInit = false; - _backwardsScanning = true; - _timer.reset(); - } - - // If we're still reading backwards, keep trying until timing out. - if (_backwardsScanning) { - verify(!_extentHopping); - // Still have time to succeed with reading backwards. - if (_timer.seconds() < _backwardsScanTime) { - return workBackwardsScan(out); - } - - try { - // If this throws WCE, it leave us in a state were the next call to work will retry. - switchToExtentHopping(); - } - catch (const WriteConflictException& wce) { - _subIterators.clear(); - *out = WorkingSet::INVALID_ID; - return NEED_YIELD; - } - } - - // Don't find it in time? Swing from extent to extent like tarzan.com. - verify(_extentHopping); - return workExtentHopping(out); +using std::vector; + +const char* OplogStart::kStageType = "OPLOG_START"; + +// Does not take ownership. +OplogStart::OplogStart(OperationContext* txn, + const Collection* collection, + MatchExpression* filter, + WorkingSet* ws) + : _txn(txn), + _needInit(true), + _backwardsScanning(false), + _extentHopping(false), + _done(false), + _collection(collection), + _workingSet(ws), + _filter(filter) {} + +OplogStart::~OplogStart() {} + +PlanStage::StageState OplogStart::work(WorkingSetID* out) { + // We do our (heavy) init in a work(), where work is expected. + if (_needInit) { + CollectionScanParams params; + params.collection = _collection; + params.direction = CollectionScanParams::BACKWARD; + _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL)); + + _needInit = false; + _backwardsScanning = true; + _timer.reset(); } - PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) { - if (_done || _subIterators.empty()) { - return PlanStage::IS_EOF; + // If we're still reading backwards, keep trying until timing out. + if (_backwardsScanning) { + verify(!_extentHopping); + // Still have time to succeed with reading backwards. + if (_timer.seconds() < _backwardsScanTime) { + return workBackwardsScan(out); } - // we work from the back to the front since the back has the newest data. try { - // TODO: should we ever check fetcherForNext()? - if (auto record = _subIterators.back()->next()) { - BSONObj obj = record->data.releaseToBson(); - if (!_filter->matchesBSON(obj)) { - _done = true; - WorkingSetID id = _workingSet->allocate(); - WorkingSetMember* member = _workingSet->get(id); - member->loc = record->id; - member->obj = {_txn->recoveryUnit()->getSnapshotId(), std::move(obj)}; - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - *out = id; - return PlanStage::ADVANCED; - } - } - } - catch (const WriteConflictException& wce) { + // If this throws WCE, it leave us in a state were the next call to work will retry. + switchToExtentHopping(); + } catch (const WriteConflictException& wce) { + _subIterators.clear(); *out = WorkingSet::INVALID_ID; - return PlanStage::NEED_YIELD; + return NEED_YIELD; } - - _subIterators.pop_back(); - return PlanStage::NEED_TIME; } - void OplogStart::switchToExtentHopping() { - // Set up our extent hopping state. - _subIterators = _collection->getManyCursors(_txn); + // Don't find it in time? Swing from extent to extent like tarzan.com. + verify(_extentHopping); + return workExtentHopping(out); +} - // Transition from backwards scanning to extent hopping. - _backwardsScanning = false; - _extentHopping = true; +PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) { + if (_done || _subIterators.empty()) { + return PlanStage::IS_EOF; + } - // Toss the collection scan we were using. - _cs.reset(); + // we work from the back to the front since the back has the newest data. + try { + // TODO: should we ever check fetcherForNext()? + if (auto record = _subIterators.back()->next()) { + BSONObj obj = record->data.releaseToBson(); + if (!_filter->matchesBSON(obj)) { + _done = true; + WorkingSetID id = _workingSet->allocate(); + WorkingSetMember* member = _workingSet->get(id); + member->loc = record->id; + member->obj = {_txn->recoveryUnit()->getSnapshotId(), std::move(obj)}; + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + *out = id; + return PlanStage::ADVANCED; + } + } + } catch (const WriteConflictException& wce) { + *out = WorkingSet::INVALID_ID; + return PlanStage::NEED_YIELD; } - PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) { - PlanStage::StageState state = _cs->work(out); + _subIterators.pop_back(); + return PlanStage::NEED_TIME; +} - // EOF. Just start from the beginning, which is where we've hit. - if (PlanStage::IS_EOF == state) { - _done = true; - return state; - } +void OplogStart::switchToExtentHopping() { + // Set up our extent hopping state. + _subIterators = _collection->getManyCursors(_txn); - if (PlanStage::ADVANCED != state) { return state; } + // Transition from backwards scanning to extent hopping. + _backwardsScanning = false; + _extentHopping = true; - WorkingSetMember* member = _workingSet->get(*out); - verify(member->hasObj()); - verify(member->hasLoc()); + // Toss the collection scan we were using. + _cs.reset(); +} - if (!_filter->matchesBSON(member->obj.value())) { - _done = true; - // RecordId is returned in *out. - return PlanStage::ADVANCED; - } - else { - _workingSet->free(*out); - return PlanStage::NEED_TIME; - } +PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) { + PlanStage::StageState state = _cs->work(out); + + // EOF. Just start from the beginning, which is where we've hit. + if (PlanStage::IS_EOF == state) { + _done = true; + return state; } - bool OplogStart::isEOF() { return _done; } + if (PlanStage::ADVANCED != state) { + return state; + } - void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { - if (_needInit) { return; } + WorkingSetMember* member = _workingSet->get(*out); + verify(member->hasObj()); + verify(member->hasLoc()); - if (INVALIDATION_DELETION != type) { return; } + if (!_filter->matchesBSON(member->obj.value())) { + _done = true; + // RecordId is returned in *out. + return PlanStage::ADVANCED; + } else { + _workingSet->free(*out); + return PlanStage::NEED_TIME; + } +} - if (_cs) { - _cs->invalidate(txn, dl, type); - } +bool OplogStart::isEOF() { + return _done; +} - for (size_t i = 0; i < _subIterators.size(); i++) { - _subIterators[i]->invalidate(dl); - } +void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { + if (_needInit) { + return; } - void OplogStart::saveState() { - _txn = NULL; - if (_cs) { - _cs->saveState(); - } + if (INVALIDATION_DELETION != type) { + return; + } - for (size_t i = 0; i < _subIterators.size(); i++) { - _subIterators[i]->savePositioned(); - } + if (_cs) { + _cs->invalidate(txn, dl, type); } - void OplogStart::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - if (_cs) { - _cs->restoreState(opCtx); - } + for (size_t i = 0; i < _subIterators.size(); i++) { + _subIterators[i]->invalidate(dl); + } +} - for (size_t i = 0; i < _subIterators.size(); i++) { - if (!_subIterators[i]->restore(opCtx)) { - _subIterators.erase(_subIterators.begin() + i); - // need to hit same i on next pass through loop - i--; - } - } +void OplogStart::saveState() { + _txn = NULL; + if (_cs) { + _cs->saveState(); } - PlanStageStats* OplogStart::getStats() { - std::unique_ptr<PlanStageStats> ret(new PlanStageStats(CommonStats(kStageType), - STAGE_OPLOG_START)); - ret->specific.reset(new CollectionScanStats()); - return ret.release(); + for (size_t i = 0; i < _subIterators.size(); i++) { + _subIterators[i]->savePositioned(); } +} - vector<PlanStage*> OplogStart::getChildren() const { - vector<PlanStage*> empty; - return empty; +void OplogStart::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + if (_cs) { + _cs->restoreState(opCtx); } - int OplogStart::_backwardsScanTime = 5; + for (size_t i = 0; i < _subIterators.size(); i++) { + if (!_subIterators[i]->restore(opCtx)) { + _subIterators.erase(_subIterators.begin() + i); + // need to hit same i on next pass through loop + i--; + } + } +} + +PlanStageStats* OplogStart::getStats() { + std::unique_ptr<PlanStageStats> ret( + new PlanStageStats(CommonStats(kStageType), STAGE_OPLOG_START)); + ret->specific.reset(new CollectionScanStats()); + return ret.release(); +} + +vector<PlanStage*> OplogStart::getChildren() const { + vector<PlanStage*> empty; + return empty; +} + +int OplogStart::_backwardsScanTime = 5; } // namespace mongo |