summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorHari Khalsa <hkhalsa@10gen.com>2014-02-10 17:44:53 -0500
committerHari Khalsa <hkhalsa@10gen.com>2014-02-13 12:25:10 -0500
commit82f354996edb1e6726de37aee0ca17947a55fe0b (patch)
tree2948a9e15d4d24bc166fb4ea0ef68607e3fa67a6 /src/mongo
parent95fd16c297aba152feb99da9653ea234168ec257 (diff)
downloadmongo-82f354996edb1e6726de37aee0ca17947a55fe0b.tar.gz
SERVER-12677 text stage can work() incrementally
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/exec/text.cpp274
-rw-r--r--src/mongo/db/exec/text.h70
2 files changed, 174 insertions, 170 deletions
diff --git a/src/mongo/db/exec/text.cpp b/src/mongo/db/exec/text.cpp
index a14e7bc808f..77490a79058 100644
--- a/src/mongo/db/exec/text.cpp
+++ b/src/mongo/db/exec/text.cpp
@@ -45,83 +45,71 @@ namespace mongo {
_ftsMatcher(params.query, params.spec),
_ws(ws),
_filter(filter),
- _filledOutResults(false),
- _curResult(0) { }
+ _internalState(INIT_SCANS),
+ _currentIndexScanner(0) {
+
+ _scoreIterator = _scores.end();
+ }
TextStage::~TextStage() { }
bool TextStage::isEOF() {
- // If we haven't filled out our results yet we can't be EOF.
- if (!_filledOutResults) { return false; }
-
- // We're EOF when we've returned all our results.
- return _curResult >= _results.size();
+ return _internalState == DONE;
}
PlanStage::StageState TextStage::work(WorkingSetID* out) {
++_commonStats.works;
- if (isEOF()) { return PlanStage::IS_EOF; }
-
- // Fill out our result queue.
- if (!_filledOutResults) {
- PlanStage::StageState ss = fillOutResults(out);
- if (ss == PlanStage::IS_EOF || ss == PlanStage::FAILURE) {
- return ss;
- }
- verify(ss == PlanStage::NEED_TIME);
- }
- // Having cached all our results, return them one at a time.
- WorkingSetID id = _results[_curResult];
-
- // Advance to next result.
- ++_curResult;
- *out = id;
+ if (isEOF()) { return PlanStage::IS_EOF; }
- // If we're returning something, take it out of our DL -> WSID map so that future
- // calls to invalidate don't cause us to take action for a DL we're done with.
- WorkingSetMember* member = _ws->get(*out);
- if (member->hasLoc()) {
- _wsidByDiskLoc.erase(member->loc);
+ switch (_internalState) {
+ case INIT_SCANS:
+ return initScans(out);
+ case READING_TERMS:
+ return readFromSubScanners(out);
+ case RETURNING_RESULTS:
+ return returnResults(out);
+ case DONE:
+ return PlanStage::IS_EOF;
}
- return PlanStage::ADVANCED;
+ // Not reached.
+ return PlanStage::IS_EOF;
}
void TextStage::prepareToYield() {
++_commonStats.yields;
- // TODO: When we incrementally read results, tell our sub-runners to yield.
+
+ for (size_t i = 0; i < _scanners.size(); ++i) {
+ _scanners.mutableVector()[i]->prepareToYield();
+ }
}
void TextStage::recoverFromYield() {
++_commonStats.unyields;
- // TODO: When we incrementally read results, tell our sub-runners to unyield.
+
+ for (size_t i = 0; i < _scanners.size(); ++i) {
+ _scanners.mutableVector()[i]->recoverFromYield();
+ }
}
void TextStage::invalidate(const DiskLoc& dl, InvalidationType type) {
++_commonStats.invalidates;
- // Invalidation does not affect the number of results added in fillOutResults().
- // All it affects is whether the WSM returned to the caller has a DiskLoc.
-
- // _results contains indices into the WorkingSet, not actual data. If a WorkingSetMember in
- // the WorkingSet needs to change state as a result of a DiskLoc invalidation, it will still
- // be at the same spot in the WorkingSet. As such, we don't need to modify _results.
- DataMap::iterator it = _wsidByDiskLoc.find(dl);
-
- // If we're holding on to data that's got the DiskLoc we're invalidating...
- if (_wsidByDiskLoc.end() != it) {
- // Grab the WSM that we're converting from LOC_AND_UNOWNED to OWNED_OBJ.
- WorkingSetMember* member = _ws->get(it->second);
- verify(member->loc == dl);
- verify(member->state == WorkingSetMember::LOC_AND_UNOWNED_OBJ);
-
- member->loc.Null();
- member->obj = member->obj.getOwned();
- member->state = WorkingSetMember::OWNED_OBJ;
+ // Propagate invalidate to children.
+ for (size_t i = 0; i < _scanners.size(); ++i) {
+ _scanners.mutableVector()[i]->invalidate(dl, type);
+ }
- // Remove the DiskLoc from our set of active DLs.
- _wsidByDiskLoc.erase(it);
+ // We store the score keyed by DiskLoc. We have to toss out our state when the DiskLoc
+ // changes.
+ // TODO: If we're RETURNING_RESULTS we could somehow buffer the object.
+ ScoreMap::iterator scoreIt = _scores.find(dl);
+ if (scoreIt != _scores.end()) {
+ if (scoreIt == _scoreIterator) {
+ _scoreIterator++;
+ }
+ _scores.erase(scoreIt);
}
}
@@ -132,136 +120,116 @@ namespace mongo {
return ret.release();
}
- PlanStage::StageState TextStage::fillOutResults(WorkingSetID* out) {
- Database* db = cc().database();
- Collection* collection = db->getCollection( _params.ns );
- if (NULL == collection) {
- std::string errmsg = mongoutils::str::stream() << "TextStage params namespace error";
- warning() << errmsg;
- Status status(ErrorCodes::NamespaceNotFound, errmsg);
- *out = WorkingSetCommon::allocateStatusMember( _ws, status);
- return PlanStage::FAILURE;
- }
- vector<IndexDescriptor*> idxMatches;
- collection->getIndexCatalog()->findIndexByType("text", idxMatches);
- if (1 != idxMatches.size()) {
- std::string errmsg = mongoutils::str::stream() << "Expected exactly one text index";
- warning() << errmsg;
- // Using IndexNotFound error code because we are unable to
- // determine which index to select.
- Status status(ErrorCodes::IndexNotFound, errmsg);
- *out = WorkingSetCommon::allocateStatusMember( _ws, status);
- return PlanStage::FAILURE;
- }
+ PlanStage::StageState TextStage::initScans(WorkingSetID* out) {
+ invariant(0 == _scanners.size());
// Get all the index scans for each term in our query.
- OwnedPointerVector<PlanStage> scanners;
for (size_t i = 0; i < _params.query.getTerms().size(); i++) {
const string& term = _params.query.getTerms()[i];
IndexScanParams params;
- params.bounds.startKey = FTSIndexFormat::getIndexKey(MAX_WEIGHT, term,
+ params.bounds.startKey = FTSIndexFormat::getIndexKey(MAX_WEIGHT,
+ term,
_params.indexPrefix);
params.bounds.endKey = FTSIndexFormat::getIndexKey(0, term, _params.indexPrefix);
params.bounds.endKeyInclusive = true;
params.bounds.isSimpleRange = true;
- params.descriptor = idxMatches[0];
+ params.descriptor = _params.index;
params.direction = -1;
- IndexScan* ixscan = new IndexScan(params, _ws, NULL);
- scanners.mutableVector().push_back(ixscan);
+ _scanners.mutableVector().push_back(new IndexScan(params, _ws, NULL));
}
- // Map: diskloc -> aggregate score for doc.
- typedef unordered_map<DiskLoc, double, DiskLoc::Hasher> ScoreMap;
- ScoreMap scores;
+ // If we have no terms we go right to EOF.
+ if (0 == _scanners.size()) {
+ _internalState = DONE;
+ return PlanStage::IS_EOF;
+ }
- // For each index scan, read all results and store scores.
- size_t currentIndexScanner = 0;
- while (currentIndexScanner < scanners.size()) {
- BSONObj keyObj;
- DiskLoc loc;
+ // Transition to the next state.
+ _internalState = READING_TERMS;
+ return PlanStage::NEED_TIME;
+ }
- WorkingSetID id = WorkingSet::INVALID_ID;
- PlanStage::StageState state = scanners.vector()[currentIndexScanner]->work(&id);
+ PlanStage::StageState TextStage::readFromSubScanners(WorkingSetID* out) {
+ // This should be checked before we get here.
+ invariant(_currentIndexScanner < _scanners.size());
+
+ // Read the next result from our current scanner.
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState childState = _scanners.vector()[_currentIndexScanner]->work(&id);
+
+ if (PlanStage::ADVANCED == childState) {
+ WorkingSetMember* wsm = _ws->get(id);
+ invariant(1 == wsm->keyData.size());
+ invariant(wsm->hasLoc());
+ IndexKeyDatum& keyDatum = wsm->keyData.back();
+ addTerm(keyDatum.keyData, wsm->loc);
+ _ws->free(id);
+ return PlanStage::NEED_TIME;
+ }
+ else if (PlanStage::IS_EOF == childState) {
+ // Done with this scan.
+ ++_currentIndexScanner;
- if (PlanStage::ADVANCED == state) {
- WorkingSetMember* wsm = _ws->get(id);
- IndexKeyDatum& keyDatum = wsm->keyData.back();
- filterAndScore(keyDatum.keyData, wsm->loc, &scores[wsm->loc]);
- _ws->free(id);
- }
- else if (PlanStage::IS_EOF == state) {
- // Done with this scan.
- ++currentIndexScanner;
+ if (_currentIndexScanner < _scanners.size()) {
+ // We have another scan to read from.
+ return PlanStage::NEED_TIME;
}
- else if (PlanStage::NEED_FETCH == state) {
- // We're calling work() on ixscans and they have no way to return a fetch.
- verify(false);
- }
- else if (PlanStage::NEED_TIME == state) {
- // We are a blocking stage, so ignore scanner's request for more time.
- }
- else {
- verify(PlanStage::FAILURE == state);
- std::string errmsg = mongoutils::str::stream() <<
- "error from index scan during text stage: invalid FAILURE state";
- warning() << errmsg;
- // Propagate error status from underlying index scan if available.
- // Otherwise, create a new error status.
- if (WorkingSet::INVALID_ID == id) {
- // Using InternalError error code because this is very uncommon.
- // Currently, there are no code paths in IndexScan::work() that return
- // PlanStage::FAILURE.
- Status status(ErrorCodes::InternalError, errmsg);
- id = WorkingSetCommon::allocateStatusMember( _ws, status);
- }
+
+ // If we're here we are done reading results. Move to the next state.
+ _scoreIterator = _scores.begin();
+ _internalState = RETURNING_RESULTS;
+
+ // Don't need to keep these around.
+ _scanners.clear();
+ return PlanStage::NEED_TIME;
+ }
+ else {
+ if (PlanStage::FAILURE == childState) {
+ // Propagate failure from below.
*out = id;
- return PlanStage::FAILURE;
}
+ return childState;
+ }
+ }
+
+ PlanStage::StageState TextStage::returnResults(WorkingSetID* out) {
+ if (_scoreIterator == _scores.end()) {
+ _internalState = DONE;
+ return PlanStage::IS_EOF;
}
// Filter for phrases and negative terms, score and truncate.
- for (ScoreMap::iterator i = scores.begin(); i != scores.end(); ++i) {
- DiskLoc loc = i->first;
- double score = i->second;
+ DiskLoc loc = _scoreIterator->first;
+ double score = _scoreIterator->second;
+ _scoreIterator++;
- // Ignore non-matched documents.
- if (score < 0) {
- continue;
- }
+ // Ignore non-matched documents.
+ if (score < 0) {
+ return PlanStage::NEED_TIME;
+ }
- // Filter for phrases and negated terms
- if (_params.query.hasNonTermPieces()) {
- if (!_ftsMatcher.matchesNonTerm(loc.obj())) {
- continue;
- }
+ // Filter for phrases and negated terms
+ if (_params.query.hasNonTermPieces()) {
+ if (!_ftsMatcher.matchesNonTerm(loc.obj())) {
+ return PlanStage::NEED_TIME;
}
-
- // Add results to working set as LOC_AND_UNOWNED_OBJ initially.
- // On invalidation, we copy the object and change the state to
- // OWNED_OBJ.
- // Fill out a WSM.
- WorkingSetID id = _ws->allocate();
- WorkingSetMember* member = _ws->get(id);
- member->loc = loc;
- member->obj = member->loc.obj();
- member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
- member->addComputed(new TextScoreComputedData(score));
-
- _results.push_back(id);
- _wsidByDiskLoc[member->loc] = id;
}
- _filledOutResults = true;
-
- if (_results.size() == 0) {
- return PlanStage::IS_EOF;
- }
- return PlanStage::NEED_TIME;
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->loc = loc;
+ member->obj = member->loc.obj();
+ member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
+ member->addComputed(new TextScoreComputedData(score));
+ return PlanStage::ADVANCED;
}
class TextMatchableDocument : public MatchableDocument {
public:
- TextMatchableDocument(const BSONObj& keyPattern, const BSONObj& key, DiskLoc loc, bool *fetched)
+ TextMatchableDocument(const BSONObj& keyPattern,
+ const BSONObj& key,
+ DiskLoc loc,
+ bool *fetched)
: _keyPattern(keyPattern),
_key(key),
_loc(loc),
@@ -308,8 +276,8 @@ namespace mongo {
bool* _fetched;
};
- void TextStage::filterAndScore(BSONObj key, DiskLoc loc, double* documentAggregateScore) {
- invariant(documentAggregateScore);
+ void TextStage::addTerm(const BSONObj& key, const DiskLoc& loc) {
+ double *documentAggregateScore = &_scores[loc];
++_specificStats.keysExamined;
diff --git a/src/mongo/db/exec/text.h b/src/mongo/db/exec/text.h
index 2b27670b067..932e0de2e7c 100644
--- a/src/mongo/db/exec/text.h
+++ b/src/mongo/db/exec/text.h
@@ -79,6 +79,23 @@ namespace mongo {
*/
class TextStage : public PlanStage {
public:
+ /**
+ * The text stage has a few 'states' it transitions between.
+ */
+ enum State {
+ // 1. Initialize the index scans we use to retrieve term/score info.
+ INIT_SCANS,
+
+ // 2. Read the terms/scores from the text index.
+ READING_TERMS,
+
+ // 3. Return results to our parent.
+ RETURNING_RESULTS,
+
+ // 4. Done.
+ DONE,
+ };
+
TextStage(const TextStageParams& params, WorkingSet* ws, const MatchExpression* filter);
virtual ~TextStage();
@@ -93,14 +110,29 @@ namespace mongo {
PlanStageStats* getStats();
private:
- // Helper for buffering results array. Returns NEED_TIME (if any results were produced),
- // IS_EOF, or FAILURE.
- // If the result state is FAILURE, out be set to a valid status member WSID.
- StageState fillOutResults(WorkingSetID *out);
-
- // Helper to update aggregate score with a new-found (term, score) pair for this document.
- // Also rejects documents that don't match this stage's filter.
- void filterAndScore(BSONObj key, DiskLoc loc, double* documentAggregateScore);
+ /**
+ * Initializes sub-scanners.
+ */
+ StageState initScans(WorkingSetID* out);
+
+ /**
+ * Helper for buffering results array. Returns NEED_TIME (if any results were produced),
+ * IS_EOF, or FAILURE.
+ */
+ StageState readFromSubScanners(WorkingSetID* out);
+
+ /**
+ * Helper called from readFromSubScanners to update aggregate score with a new-found (term,
+ * score) pair for this document. Also rejects documents that don't match this stage's
+ * filter.
+ */
+ void addTerm(const BSONObj& key, const DiskLoc& loc);
+
+ /**
+ * Possibly return a result. FYI, this may perform a fetch directly if it is needed to
+ * evaluate all filters.
+ */
+ StageState returnResults(WorkingSetID* out);
// Parameters of this text stage.
TextStageParams _params;
@@ -118,18 +150,22 @@ namespace mongo {
CommonStats _commonStats;
TextStats _specificStats;
- // State bit for work(). True if results have been buffered.
- bool _filledOutResults;
+ // What state are we in? See the State enum above.
+ State _internalState;
- // WSIDs in result.
- std::vector<WorkingSetID> _results;
+ // Used in INIT_SCANS and READING_TERMS. The index scans we're using to retrieve text
+ // terms.
+ OwnedPointerVector<PlanStage> _scanners;
- // We want to look up data in the working set by DiskLoc quickly upon invalidation.
- typedef unordered_map<DiskLoc, WorkingSetID, DiskLoc::Hasher> DataMap;
- DataMap _wsidByDiskLoc;
+ // Which _scanners are we currently reading from?
+ size_t _currentIndexScanner;
- // The next result to return from work().
- size_t _curResult;
+ // Temporary score data filled out by sub-scans. Used in READING_TERMS and
+ // RETURNING_RESULTS.
+ // Maps from diskloc -> aggregate score for doc.
+ typedef unordered_map<DiskLoc, double, DiskLoc::Hasher> ScoreMap;
+ ScoreMap _scores;
+ ScoreMap::const_iterator _scoreIterator;
};
} // namespace mongo