/** * Copyright (C) 2013-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/exec/oplogstart.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" namespace mongo { using std::vector; const char* OplogStart::kStageType = "OPLOG_START"; // Does not take ownership. OplogStart::OplogStart(OperationContext* txn, const Collection* collection, MatchExpression* filter, WorkingSet* ws) : _txn(txn), _needInit(true), _backwardsScanning(false), _extentHopping(false), _done(false), _collection(collection), _workingSet(ws), _filter(filter) { } OplogStart::~OplogStart() { } PlanStage::StageState OplogStart::work(WorkingSetID* out) { // We do our (heavy) init in a work(), where work is expected. if (_needInit) { CollectionScanParams params; params.collection = _collection; params.direction = CollectionScanParams::BACKWARD; _cs.reset(new CollectionScan(_txn, params, _workingSet, NULL)); _needInit = false; _backwardsScanning = true; _timer.reset(); } // If we're still reading backwards, keep trying until timing out. if (_backwardsScanning) { verify(!_extentHopping); // Still have time to succeed with reading backwards. if (_timer.seconds() < _backwardsScanTime) { return workBackwardsScan(out); } try { // If this throws WCE, it leave us in a state were the next call to work will retry. switchToExtentHopping(); } catch (const WriteConflictException& wce) { _subIterators.clear(); *out = WorkingSet::INVALID_ID; return NEED_YIELD; } } // Don't find it in time? Swing from extent to extent like tarzan.com. verify(_extentHopping); return workExtentHopping(out); } PlanStage::StageState OplogStart::workExtentHopping(WorkingSetID* out) { if (_done || _subIterators.empty()) { return PlanStage::IS_EOF; } // we work from the back to the front since the back has the newest data. try { // TODO: should we ever check fetcherForNext()? if (auto record = _subIterators.back()->next()) { BSONObj obj = record->data.releaseToBson(); if (!_filter->matchesBSON(obj)) { _done = true; WorkingSetID id = _workingSet->allocate(); WorkingSetMember* member = _workingSet->get(id); member->loc = record->id; member->obj = {_txn->recoveryUnit()->getSnapshotId(), std::move(obj)}; member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; *out = id; return PlanStage::ADVANCED; } } } catch (const WriteConflictException& wce) { *out = WorkingSet::INVALID_ID; return PlanStage::NEED_YIELD; } _subIterators.pop_back(); return PlanStage::NEED_TIME; } void OplogStart::switchToExtentHopping() { // Set up our extent hopping state. _subIterators = _collection->getManyCursors(_txn); // Transition from backwards scanning to extent hopping. _backwardsScanning = false; _extentHopping = true; // Toss the collection scan we were using. _cs.reset(); } PlanStage::StageState OplogStart::workBackwardsScan(WorkingSetID* out) { PlanStage::StageState state = _cs->work(out); // EOF. Just start from the beginning, which is where we've hit. if (PlanStage::IS_EOF == state) { _done = true; return state; } if (PlanStage::ADVANCED != state) { return state; } WorkingSetMember* member = _workingSet->get(*out); verify(member->hasObj()); verify(member->hasLoc()); if (!_filter->matchesBSON(member->obj.value())) { _done = true; // RecordId is returned in *out. return PlanStage::ADVANCED; } else { _workingSet->free(*out); return PlanStage::NEED_TIME; } } bool OplogStart::isEOF() { return _done; } void OplogStart::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { if (_needInit) { return; } if (INVALIDATION_DELETION != type) { return; } if (_cs) { _cs->invalidate(txn, dl, type); } for (size_t i = 0; i < _subIterators.size(); i++) { _subIterators[i]->invalidate(dl); } } void OplogStart::saveState() { _txn = NULL; if (_cs) { _cs->saveState(); } for (size_t i = 0; i < _subIterators.size(); i++) { _subIterators[i]->savePositioned(); } } void OplogStart::restoreState(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; if (_cs) { _cs->restoreState(opCtx); } for (size_t i = 0; i < _subIterators.size(); i++) { if (!_subIterators[i]->restore(opCtx)) { _subIterators.erase(_subIterators.begin() + i); // need to hit same i on next pass through loop i--; } } } PlanStageStats* OplogStart::getStats() { std::unique_ptr ret(new PlanStageStats(CommonStats(kStageType), STAGE_OPLOG_START)); ret->specific.reset(new CollectionScanStats()); return ret.release(); } vector OplogStart::getChildren() const { vector empty; return empty; } int OplogStart::_backwardsScanTime = 5; } // namespace mongo