diff options
Diffstat (limited to 'src/mongo/db/exec/multi_iterator.cpp')
-rw-r--r-- | src/mongo/db/exec/multi_iterator.cpp | 188 |
1 files changed, 92 insertions, 96 deletions
diff --git a/src/mongo/db/exec/multi_iterator.cpp b/src/mongo/db/exec/multi_iterator.cpp index f8aeaac8ca5..fe955cb05af 100644 --- a/src/mongo/db/exec/multi_iterator.cpp +++ b/src/mongo/db/exec/multi_iterator.cpp @@ -36,100 +36,96 @@ namespace mongo { - using std::vector; - - const char* MultiIteratorStage::kStageType = "MULTI_ITERATOR"; - - MultiIteratorStage::MultiIteratorStage(OperationContext* txn, - WorkingSet* ws, - Collection* collection) - : _txn(txn), - _collection(collection), - _ws(ws), - _wsidForFetch(_ws->allocate()) { - // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used - // for anything other than passing up NEED_YIELD. We use the loc and owned obj state, but - // the loc isn't really pointing at any obj. The obj field of the WSM should never be used. - WorkingSetMember* member = _ws->get(_wsidForFetch); - member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; +using std::vector; + +const char* MultiIteratorStage::kStageType = "MULTI_ITERATOR"; + +MultiIteratorStage::MultiIteratorStage(OperationContext* txn, + WorkingSet* ws, + Collection* collection) + : _txn(txn), _collection(collection), _ws(ws), _wsidForFetch(_ws->allocate()) { + // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used + // for anything other than passing up NEED_YIELD. We use the loc and owned obj state, but + // the loc isn't really pointing at any obj. The obj field of the WSM should never be used. + WorkingSetMember* member = _ws->get(_wsidForFetch); + member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; +} + +void MultiIteratorStage::addIterator(std::unique_ptr<RecordCursor> it) { + _iterators.push_back(std::move(it)); +} + +PlanStage::StageState MultiIteratorStage::work(WorkingSetID* out) { + if (_collection == NULL) { + Status status(ErrorCodes::InternalError, "MultiIteratorStage died on null collection"); + *out = WorkingSetCommon::allocateStatusMember(_ws, status); + return PlanStage::DEAD; } - void MultiIteratorStage::addIterator(std::unique_ptr<RecordCursor> it) { - _iterators.push_back(std::move(it)); - } - - PlanStage::StageState MultiIteratorStage::work(WorkingSetID* out) { - if (_collection == NULL) { - Status status(ErrorCodes::InternalError, - "MultiIteratorStage died on null collection"); - *out = WorkingSetCommon::allocateStatusMember(_ws, status); - return PlanStage::DEAD; - } - - boost::optional<Record> record; - try { - while (!_iterators.empty()) { - if (auto fetcher = _iterators.back()->fetcherForNext()) { - // Pass the RecordFetcher off up. - WorkingSetMember* member = _ws->get(_wsidForFetch); - member->setFetcher(fetcher.release()); - *out = _wsidForFetch; - return NEED_YIELD; - } - - record = _iterators.back()->next(); - if (record) break; - _iterators.pop_back(); + boost::optional<Record> record; + try { + while (!_iterators.empty()) { + if (auto fetcher = _iterators.back()->fetcherForNext()) { + // Pass the RecordFetcher off up. + WorkingSetMember* member = _ws->get(_wsidForFetch); + member->setFetcher(fetcher.release()); + *out = _wsidForFetch; + return NEED_YIELD; } - } - catch (const WriteConflictException& wce) { - // If _advance throws a WCE we shouldn't have moved. - invariant(!_iterators.empty()); - *out = WorkingSet::INVALID_ID; - return NEED_YIELD; - } - - if (!record) - return IS_EOF; - - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->loc = record->id; - member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - return PlanStage::ADVANCED; - } - bool MultiIteratorStage::isEOF() { - return _collection == NULL || _iterators.empty(); - } - - void MultiIteratorStage::kill() { - _collection = NULL; - _iterators.clear(); - } - - void MultiIteratorStage::saveState() { - _txn = NULL; - for (size_t i = 0; i < _iterators.size(); i++) { - _iterators[i]->savePositioned(); + record = _iterators.back()->next(); + if (record) + break; + _iterators.pop_back(); } + } catch (const WriteConflictException& wce) { + // If _advance throws a WCE we shouldn't have moved. + invariant(!_iterators.empty()); + *out = WorkingSet::INVALID_ID; + return NEED_YIELD; } - void MultiIteratorStage::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - for (size_t i = 0; i < _iterators.size(); i++) { - if (!_iterators[i]->restore(opCtx)) { - kill(); - } + if (!record) + return IS_EOF; + + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->loc = record->id; + member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + return PlanStage::ADVANCED; +} + +bool MultiIteratorStage::isEOF() { + return _collection == NULL || _iterators.empty(); +} + +void MultiIteratorStage::kill() { + _collection = NULL; + _iterators.clear(); +} + +void MultiIteratorStage::saveState() { + _txn = NULL; + for (size_t i = 0; i < _iterators.size(); i++) { + _iterators[i]->savePositioned(); + } +} + +void MultiIteratorStage::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + for (size_t i = 0; i < _iterators.size(); i++) { + if (!_iterators[i]->restore(opCtx)) { + kill(); } } +} - void MultiIteratorStage::invalidate(OperationContext* txn, - const RecordId& dl, - InvalidationType type) { - switch ( type ) { +void MultiIteratorStage::invalidate(OperationContext* txn, + const RecordId& dl, + InvalidationType type) { + switch (type) { case INVALIDATION_DELETION: for (size_t i = 0; i < _iterators.size(); i++) { _iterators[i]->invalidate(dl); @@ -138,19 +134,19 @@ namespace mongo { case INVALIDATION_MUTATION: // no-op break; - } } +} - vector<PlanStage*> MultiIteratorStage::getChildren() const { - vector<PlanStage*> empty; - return empty; - } +vector<PlanStage*> MultiIteratorStage::getChildren() const { + vector<PlanStage*> empty; + return empty; +} - PlanStageStats* MultiIteratorStage::getStats() { - std::unique_ptr<PlanStageStats> ret(new PlanStageStats(CommonStats(kStageType), - STAGE_MULTI_ITERATOR)); - ret->specific.reset(new CollectionScanStats()); - return ret.release(); - } +PlanStageStats* MultiIteratorStage::getStats() { + std::unique_ptr<PlanStageStats> ret( + new PlanStageStats(CommonStats(kStageType), STAGE_MULTI_ITERATOR)); + ret->specific.reset(new CollectionScanStats()); + return ret.release(); +} -} // namespace mongo +} // namespace mongo |