diff options
author | Hari Khalsa <hkhalsa@10gen.com> | 2014-02-10 17:44:53 -0500 |
---|---|---|
committer | Hari Khalsa <hkhalsa@10gen.com> | 2014-02-13 12:25:10 -0500 |
commit | 82f354996edb1e6726de37aee0ca17947a55fe0b (patch) | |
tree | 2948a9e15d4d24bc166fb4ea0ef68607e3fa67a6 /src/mongo | |
parent | 95fd16c297aba152feb99da9653ea234168ec257 (diff) | |
download | mongo-82f354996edb1e6726de37aee0ca17947a55fe0b.tar.gz |
SERVER-12677 text stage can work() incrementally
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/exec/text.cpp | 274 | ||||
-rw-r--r-- | src/mongo/db/exec/text.h | 70 |
2 files changed, 174 insertions, 170 deletions
diff --git a/src/mongo/db/exec/text.cpp b/src/mongo/db/exec/text.cpp index a14e7bc808f..77490a79058 100644 --- a/src/mongo/db/exec/text.cpp +++ b/src/mongo/db/exec/text.cpp @@ -45,83 +45,71 @@ namespace mongo { _ftsMatcher(params.query, params.spec), _ws(ws), _filter(filter), - _filledOutResults(false), - _curResult(0) { } + _internalState(INIT_SCANS), + _currentIndexScanner(0) { + + _scoreIterator = _scores.end(); + } TextStage::~TextStage() { } bool TextStage::isEOF() { - // If we haven't filled out our results yet we can't be EOF. - if (!_filledOutResults) { return false; } - - // We're EOF when we've returned all our results. - return _curResult >= _results.size(); + return _internalState == DONE; } PlanStage::StageState TextStage::work(WorkingSetID* out) { ++_commonStats.works; - if (isEOF()) { return PlanStage::IS_EOF; } - - // Fill out our result queue. - if (!_filledOutResults) { - PlanStage::StageState ss = fillOutResults(out); - if (ss == PlanStage::IS_EOF || ss == PlanStage::FAILURE) { - return ss; - } - verify(ss == PlanStage::NEED_TIME); - } - // Having cached all our results, return them one at a time. - WorkingSetID id = _results[_curResult]; - - // Advance to next result. - ++_curResult; - *out = id; + if (isEOF()) { return PlanStage::IS_EOF; } - // If we're returning something, take it out of our DL -> WSID map so that future - // calls to invalidate don't cause us to take action for a DL we're done with. - WorkingSetMember* member = _ws->get(*out); - if (member->hasLoc()) { - _wsidByDiskLoc.erase(member->loc); + switch (_internalState) { + case INIT_SCANS: + return initScans(out); + case READING_TERMS: + return readFromSubScanners(out); + case RETURNING_RESULTS: + return returnResults(out); + case DONE: + return PlanStage::IS_EOF; } - return PlanStage::ADVANCED; + // Not reached. + return PlanStage::IS_EOF; } void TextStage::prepareToYield() { ++_commonStats.yields; - // TODO: When we incrementally read results, tell our sub-runners to yield. + + for (size_t i = 0; i < _scanners.size(); ++i) { + _scanners.mutableVector()[i]->prepareToYield(); + } } void TextStage::recoverFromYield() { ++_commonStats.unyields; - // TODO: When we incrementally read results, tell our sub-runners to unyield. + + for (size_t i = 0; i < _scanners.size(); ++i) { + _scanners.mutableVector()[i]->recoverFromYield(); + } } void TextStage::invalidate(const DiskLoc& dl, InvalidationType type) { ++_commonStats.invalidates; - // Invalidation does not affect the number of results added in fillOutResults(). - // All it affects is whether the WSM returned to the caller has a DiskLoc. - - // _results contains indices into the WorkingSet, not actual data. If a WorkingSetMember in - // the WorkingSet needs to change state as a result of a DiskLoc invalidation, it will still - // be at the same spot in the WorkingSet. As such, we don't need to modify _results. - DataMap::iterator it = _wsidByDiskLoc.find(dl); - - // If we're holding on to data that's got the DiskLoc we're invalidating... - if (_wsidByDiskLoc.end() != it) { - // Grab the WSM that we're converting from LOC_AND_UNOWNED to OWNED_OBJ. - WorkingSetMember* member = _ws->get(it->second); - verify(member->loc == dl); - verify(member->state == WorkingSetMember::LOC_AND_UNOWNED_OBJ); - - member->loc.Null(); - member->obj = member->obj.getOwned(); - member->state = WorkingSetMember::OWNED_OBJ; + // Propagate invalidate to children. + for (size_t i = 0; i < _scanners.size(); ++i) { + _scanners.mutableVector()[i]->invalidate(dl, type); + } - // Remove the DiskLoc from our set of active DLs. - _wsidByDiskLoc.erase(it); + // We store the score keyed by DiskLoc. We have to toss out our state when the DiskLoc + // changes. + // TODO: If we're RETURNING_RESULTS we could somehow buffer the object. + ScoreMap::iterator scoreIt = _scores.find(dl); + if (scoreIt != _scores.end()) { + if (scoreIt == _scoreIterator) { + _scoreIterator++; + } + _scores.erase(scoreIt); } } @@ -132,136 +120,116 @@ namespace mongo { return ret.release(); } - PlanStage::StageState TextStage::fillOutResults(WorkingSetID* out) { - Database* db = cc().database(); - Collection* collection = db->getCollection( _params.ns ); - if (NULL == collection) { - std::string errmsg = mongoutils::str::stream() << "TextStage params namespace error"; - warning() << errmsg; - Status status(ErrorCodes::NamespaceNotFound, errmsg); - *out = WorkingSetCommon::allocateStatusMember( _ws, status); - return PlanStage::FAILURE; - } - vector<IndexDescriptor*> idxMatches; - collection->getIndexCatalog()->findIndexByType("text", idxMatches); - if (1 != idxMatches.size()) { - std::string errmsg = mongoutils::str::stream() << "Expected exactly one text index"; - warning() << errmsg; - // Using IndexNotFound error code because we are unable to - // determine which index to select. - Status status(ErrorCodes::IndexNotFound, errmsg); - *out = WorkingSetCommon::allocateStatusMember( _ws, status); - return PlanStage::FAILURE; - } + PlanStage::StageState TextStage::initScans(WorkingSetID* out) { + invariant(0 == _scanners.size()); // Get all the index scans for each term in our query. - OwnedPointerVector<PlanStage> scanners; for (size_t i = 0; i < _params.query.getTerms().size(); i++) { const string& term = _params.query.getTerms()[i]; IndexScanParams params; - params.bounds.startKey = FTSIndexFormat::getIndexKey(MAX_WEIGHT, term, + params.bounds.startKey = FTSIndexFormat::getIndexKey(MAX_WEIGHT, + term, _params.indexPrefix); params.bounds.endKey = FTSIndexFormat::getIndexKey(0, term, _params.indexPrefix); params.bounds.endKeyInclusive = true; params.bounds.isSimpleRange = true; - params.descriptor = idxMatches[0]; + params.descriptor = _params.index; params.direction = -1; - IndexScan* ixscan = new IndexScan(params, _ws, NULL); - scanners.mutableVector().push_back(ixscan); + _scanners.mutableVector().push_back(new IndexScan(params, _ws, NULL)); } - // Map: diskloc -> aggregate score for doc. - typedef unordered_map<DiskLoc, double, DiskLoc::Hasher> ScoreMap; - ScoreMap scores; + // If we have no terms we go right to EOF. + if (0 == _scanners.size()) { + _internalState = DONE; + return PlanStage::IS_EOF; + } - // For each index scan, read all results and store scores. - size_t currentIndexScanner = 0; - while (currentIndexScanner < scanners.size()) { - BSONObj keyObj; - DiskLoc loc; + // Transition to the next state. + _internalState = READING_TERMS; + return PlanStage::NEED_TIME; + } - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = scanners.vector()[currentIndexScanner]->work(&id); + PlanStage::StageState TextStage::readFromSubScanners(WorkingSetID* out) { + // This should be checked before we get here. + invariant(_currentIndexScanner < _scanners.size()); + + // Read the next result from our current scanner. + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState childState = _scanners.vector()[_currentIndexScanner]->work(&id); + + if (PlanStage::ADVANCED == childState) { + WorkingSetMember* wsm = _ws->get(id); + invariant(1 == wsm->keyData.size()); + invariant(wsm->hasLoc()); + IndexKeyDatum& keyDatum = wsm->keyData.back(); + addTerm(keyDatum.keyData, wsm->loc); + _ws->free(id); + return PlanStage::NEED_TIME; + } + else if (PlanStage::IS_EOF == childState) { + // Done with this scan. + ++_currentIndexScanner; - if (PlanStage::ADVANCED == state) { - WorkingSetMember* wsm = _ws->get(id); - IndexKeyDatum& keyDatum = wsm->keyData.back(); - filterAndScore(keyDatum.keyData, wsm->loc, &scores[wsm->loc]); - _ws->free(id); - } - else if (PlanStage::IS_EOF == state) { - // Done with this scan. - ++currentIndexScanner; + if (_currentIndexScanner < _scanners.size()) { + // We have another scan to read from. + return PlanStage::NEED_TIME; } - else if (PlanStage::NEED_FETCH == state) { - // We're calling work() on ixscans and they have no way to return a fetch. - verify(false); - } - else if (PlanStage::NEED_TIME == state) { - // We are a blocking stage, so ignore scanner's request for more time. - } - else { - verify(PlanStage::FAILURE == state); - std::string errmsg = mongoutils::str::stream() << - "error from index scan during text stage: invalid FAILURE state"; - warning() << errmsg; - // Propagate error status from underlying index scan if available. - // Otherwise, create a new error status. - if (WorkingSet::INVALID_ID == id) { - // Using InternalError error code because this is very uncommon. - // Currently, there are no code paths in IndexScan::work() that return - // PlanStage::FAILURE. - Status status(ErrorCodes::InternalError, errmsg); - id = WorkingSetCommon::allocateStatusMember( _ws, status); - } + + // If we're here we are done reading results. Move to the next state. + _scoreIterator = _scores.begin(); + _internalState = RETURNING_RESULTS; + + // Don't need to keep these around. + _scanners.clear(); + return PlanStage::NEED_TIME; + } + else { + if (PlanStage::FAILURE == childState) { + // Propagate failure from below. *out = id; - return PlanStage::FAILURE; } + return childState; + } + } + + PlanStage::StageState TextStage::returnResults(WorkingSetID* out) { + if (_scoreIterator == _scores.end()) { + _internalState = DONE; + return PlanStage::IS_EOF; } // Filter for phrases and negative terms, score and truncate. - for (ScoreMap::iterator i = scores.begin(); i != scores.end(); ++i) { - DiskLoc loc = i->first; - double score = i->second; + DiskLoc loc = _scoreIterator->first; + double score = _scoreIterator->second; + _scoreIterator++; - // Ignore non-matched documents. - if (score < 0) { - continue; - } + // Ignore non-matched documents. + if (score < 0) { + return PlanStage::NEED_TIME; + } - // Filter for phrases and negated terms - if (_params.query.hasNonTermPieces()) { - if (!_ftsMatcher.matchesNonTerm(loc.obj())) { - continue; - } + // Filter for phrases and negated terms + if (_params.query.hasNonTermPieces()) { + if (!_ftsMatcher.matchesNonTerm(loc.obj())) { + return PlanStage::NEED_TIME; } - - // Add results to working set as LOC_AND_UNOWNED_OBJ initially. - // On invalidation, we copy the object and change the state to - // OWNED_OBJ. - // Fill out a WSM. - WorkingSetID id = _ws->allocate(); - WorkingSetMember* member = _ws->get(id); - member->loc = loc; - member->obj = member->loc.obj(); - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - member->addComputed(new TextScoreComputedData(score)); - - _results.push_back(id); - _wsidByDiskLoc[member->loc] = id; } - _filledOutResults = true; - - if (_results.size() == 0) { - return PlanStage::IS_EOF; - } - return PlanStage::NEED_TIME; + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->loc = loc; + member->obj = member->loc.obj(); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + member->addComputed(new TextScoreComputedData(score)); + return PlanStage::ADVANCED; } class TextMatchableDocument : public MatchableDocument { public: - TextMatchableDocument(const BSONObj& keyPattern, const BSONObj& key, DiskLoc loc, bool *fetched) + TextMatchableDocument(const BSONObj& keyPattern, + const BSONObj& key, + DiskLoc loc, + bool *fetched) : _keyPattern(keyPattern), _key(key), _loc(loc), @@ -308,8 +276,8 @@ namespace mongo { bool* _fetched; }; - void TextStage::filterAndScore(BSONObj key, DiskLoc loc, double* documentAggregateScore) { - invariant(documentAggregateScore); + void TextStage::addTerm(const BSONObj& key, const DiskLoc& loc) { + double *documentAggregateScore = &_scores[loc]; ++_specificStats.keysExamined; diff --git a/src/mongo/db/exec/text.h b/src/mongo/db/exec/text.h index 2b27670b067..932e0de2e7c 100644 --- a/src/mongo/db/exec/text.h +++ b/src/mongo/db/exec/text.h @@ -79,6 +79,23 @@ namespace mongo { */ class TextStage : public PlanStage { public: + /** + * The text stage has a few 'states' it transitions between. + */ + enum State { + // 1. Initialize the index scans we use to retrieve term/score info. + INIT_SCANS, + + // 2. Read the terms/scores from the text index. + READING_TERMS, + + // 3. Return results to our parent. + RETURNING_RESULTS, + + // 4. Done. + DONE, + }; + TextStage(const TextStageParams& params, WorkingSet* ws, const MatchExpression* filter); virtual ~TextStage(); @@ -93,14 +110,29 @@ namespace mongo { PlanStageStats* getStats(); private: - // Helper for buffering results array. Returns NEED_TIME (if any results were produced), - // IS_EOF, or FAILURE. - // If the result state is FAILURE, out be set to a valid status member WSID. - StageState fillOutResults(WorkingSetID *out); - - // Helper to update aggregate score with a new-found (term, score) pair for this document. - // Also rejects documents that don't match this stage's filter. - void filterAndScore(BSONObj key, DiskLoc loc, double* documentAggregateScore); + /** + * Initializes sub-scanners. + */ + StageState initScans(WorkingSetID* out); + + /** + * Helper for buffering results array. Returns NEED_TIME (if any results were produced), + * IS_EOF, or FAILURE. + */ + StageState readFromSubScanners(WorkingSetID* out); + + /** + * Helper called from readFromSubScanners to update aggregate score with a new-found (term, + * score) pair for this document. Also rejects documents that don't match this stage's + * filter. + */ + void addTerm(const BSONObj& key, const DiskLoc& loc); + + /** + * Possibly return a result. FYI, this may perform a fetch directly if it is needed to + * evaluate all filters. + */ + StageState returnResults(WorkingSetID* out); // Parameters of this text stage. TextStageParams _params; @@ -118,18 +150,22 @@ namespace mongo { CommonStats _commonStats; TextStats _specificStats; - // State bit for work(). True if results have been buffered. - bool _filledOutResults; + // What state are we in? See the State enum above. + State _internalState; - // WSIDs in result. - std::vector<WorkingSetID> _results; + // Used in INIT_SCANS and READING_TERMS. The index scans we're using to retrieve text + // terms. + OwnedPointerVector<PlanStage> _scanners; - // We want to look up data in the working set by DiskLoc quickly upon invalidation. - typedef unordered_map<DiskLoc, WorkingSetID, DiskLoc::Hasher> DataMap; - DataMap _wsidByDiskLoc; + // Which _scanners are we currently reading from? + size_t _currentIndexScanner; - // The next result to return from work(). - size_t _curResult; + // Temporary score data filled out by sub-scans. Used in READING_TERMS and + // RETURNING_RESULTS. + // Maps from diskloc -> aggregate score for doc. + typedef unordered_map<DiskLoc, double, DiskLoc::Hasher> ScoreMap; + ScoreMap _scores; + ScoreMap::const_iterator _scoreIterator; }; } // namespace mongo |