/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/exec/filter.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using std::unique_ptr; using std::vector; // static const char* FetchStage::kStageType = "FETCH"; FetchStage::FetchStage(OperationContext* txn, WorkingSet* ws, PlanStage* child, const MatchExpression* filter, const Collection* collection) : _txn(txn), _collection(collection), _ws(ws), _child(child), _filter(filter), _idRetrying(WorkingSet::INVALID_ID), _commonStats(kStageType) { } FetchStage::~FetchStage() { } bool FetchStage::isEOF() { if (WorkingSet::INVALID_ID != _idRetrying) { // We asked the parent for a page-in, but still haven't had a chance to return the // paged in document return false; } return _child->isEOF(); } PlanStage::StageState FetchStage::work(WorkingSetID* out) { ++_commonStats.works; // Adds the amount of time taken by work() to executionTimeMillis. ScopedTimer timer(&_commonStats.executionTimeMillis); if (isEOF()) { return PlanStage::IS_EOF; } // Either retry the last WSM we worked on or get a new one from our child. WorkingSetID id; StageState status; if (_idRetrying == WorkingSet::INVALID_ID) { status = _child->work(&id); } else { status = ADVANCED; id = _idRetrying; _idRetrying = WorkingSet::INVALID_ID; } if (PlanStage::ADVANCED == status) { WorkingSetMember* member = _ws->get(id); // If there's an obj there, there is no fetching to perform. if (member->hasObj()) { ++_specificStats.alreadyHasObj; } else { // We need a valid loc to fetch from and this is the only state that has one. verify(WorkingSetMember::LOC_AND_IDX == member->state); verify(member->hasLoc()); try { if (!_cursor) _cursor = _collection->getCursor(_txn); if (auto fetcher = _cursor->fetcherForId(member->loc)) { // There's something to fetch. Hand the fetcher off to the WSM, and pass up // a fetch request. _idRetrying = id; member->setFetcher(fetcher.release()); *out = id; _commonStats.needYield++; return NEED_YIELD; } // The doc is already in memory, so go ahead and grab it. Now we have a RecordId // as well as an unowned object if (!WorkingSetCommon::fetch(_txn, member, _cursor)) { _ws->free(id); _commonStats.needTime++; return NEED_TIME; } } catch (const WriteConflictException& wce) { _idRetrying = id; *out = WorkingSet::INVALID_ID; _commonStats.needYield++; return NEED_YIELD; } } return returnIfMatches(member, id, out); } else if (PlanStage::FAILURE == status || PlanStage::DEAD == status) { *out = id; // If a stage fails, it may create a status WSM to indicate why it // failed, in which case 'id' is valid. If ID is invalid, we // create our own error message. if (WorkingSet::INVALID_ID == id) { mongoutils::str::stream ss; ss << "fetch stage failed to read in results from child"; Status status(ErrorCodes::InternalError, ss); *out = WorkingSetCommon::allocateStatusMember( _ws, status); } return status; } else if (PlanStage::NEED_TIME == status) { ++_commonStats.needTime; } else if (PlanStage::NEED_YIELD == status) { ++_commonStats.needYield; *out = id; } return status; } void FetchStage::saveState() { _txn = NULL; ++_commonStats.yields; if (_cursor) _cursor->saveUnpositioned(); _child->saveState(); } void FetchStage::restoreState(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; ++_commonStats.unyields; if (_cursor) _cursor->restore(opCtx); _child->restoreState(opCtx); } void FetchStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { ++_commonStats.invalidates; _child->invalidate(txn, dl, type); // It's possible that the loc getting invalidated is the one we're about to // fetch. In this case we do a "forced fetch" and put the WSM in owned object state. if (WorkingSet::INVALID_ID != _idRetrying) { WorkingSetMember* member = _ws->get(_idRetrying); if (member->hasLoc() && (member->loc == dl)) { // Fetch it now and kill the diskloc. WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); } } } PlanStage::StageState FetchStage::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { // We consider "examining a document" to be every time that we pass a document through // a filter by calling Filter::passes(...) below. Therefore, the 'docsExamined' metric // is not always equal to the number of documents that were fetched from the collection. // In particular, we can sometimes generate plans which have two fetch stages. The first // one actually grabs the document from the collection, and the second passes the // document through a second filter. // // One common example of this is geoNear. Suppose that a geoNear plan is searching an // annulus to find 2dsphere-indexed documents near some point (x, y) on the globe. // After fetching documents within geo hashes that intersect this annulus, the docs are // fetched and filtered to make sure that they really do fall into this annulus. However, // the user might also want to find only those documents for which accommodationType== // "restaurant". The planner will add a second fetch stage to filter by this non-geo // predicate. ++_specificStats.docsExamined; if (Filter::passes(member, _filter)) { *out = memberID; ++_commonStats.advanced; return PlanStage::ADVANCED; } else { _ws->free(memberID); ++_commonStats.needTime; return PlanStage::NEED_TIME; } } vector FetchStage::getChildren() const { vector children; children.push_back(_child.get()); return children; } PlanStageStats* FetchStage::getStats() { _commonStats.isEOF = isEOF(); // Add a BSON representation of the filter to the stats tree, if there is one. if (NULL != _filter) { BSONObjBuilder bob; _filter->toBSON(&bob); _commonStats.filter = bob.obj(); } unique_ptr ret(new PlanStageStats(_commonStats, STAGE_FETCH)); ret->specific.reset(new FetchStats(_specificStats)); ret->children.push_back(_child->getStats()); return ret.release(); } const CommonStats* FetchStage::getCommonStats() const { return &_commonStats; } const SpecificStats* FetchStage::getSpecificStats() const { return &_specificStats; } } // namespace mongo