summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Chelminski <adam.chelminski@mongodb.com>2015-07-08 12:05:21 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-07-09 11:08:40 -0400
commitd764e3e6cf1d8a7d510df2f724282f7a053cecda (patch)
treedd4eb4f040c31f8d1383cc52184759e1ca561119
parent863d43e74257b53bff68aa3fcbdeb7eb990e8527 (diff)
downloadmongo-d764e3e6cf1d8a7d510df2f724282f7a053cecda.tar.gz
SERVER-19109 Refactor text stage into matcher and scorer stage
Signed-off-by: Mark Benvenuto <mark.benvenuto@mongodb.com>
-rw-r--r--jstests/core/fts_explain.js3
-rw-r--r--src/mongo/db/exec/SConscript2
-rw-r--r--src/mongo/db/exec/plan_stats.h28
-rw-r--r--src/mongo/db/exec/text.cpp394
-rw-r--r--src/mongo/db/exec/text.h131
-rw-r--r--src/mongo/db/exec/text_match.cpp154
-rw-r--r--src/mongo/db/exec/text_match.h104
-rw-r--r--src/mongo/db/exec/text_or.cpp450
-rw-r--r--src/mongo/db/exec/text_or.h172
-rw-r--r--src/mongo/db/query/explain.cpp38
-rw-r--r--src/mongo/db/query/stage_types.h5
11 files changed, 997 insertions, 484 deletions
diff --git a/jstests/core/fts_explain.js b/jstests/core/fts_explain.js
index 3ec8fc0315d..7e667490cd1 100644
--- a/jstests/core/fts_explain.js
+++ b/jstests/core/fts_explain.js
@@ -16,6 +16,9 @@ if ("SINGLE_SHARD" === stage.stage) {
stage = stage.shards[0].executionStages;
}
assert.eq(stage.stage, "TEXT");
+assert.eq(stage.inputStage.stage, "TEXT_MATCH");
+assert.eq(stage.inputStage.inputStage.stage, "FETCH");
+assert.eq(stage.inputStage.inputStage.inputStage.stage, "TEXT_OR");
assert.eq(stage.parsedTextQuery.terms, ["a"]);
assert.eq(stage.parsedTextQuery.negatedTerms, ["b"]);
assert.eq(stage.parsedTextQuery.phrases, ["a"]);
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index fb613333953..035817e61cc 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -67,6 +67,8 @@ env.Library(
"stagedebug_cmd.cpp",
"subplan.cpp",
"text.cpp",
+ "text_match.cpp",
+ "text_or.cpp",
"update.cpp",
"working_set_common.cpp",
],
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index 2c66370fb5f..82f8cbc7e10 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -627,7 +627,7 @@ struct UpdateStats : public SpecificStats {
};
struct TextStats : public SpecificStats {
- TextStats() : keysExamined(0), fetches(0), parsedTextQuery() {}
+ TextStats() : parsedTextQuery() {}
virtual SpecificStats* clone() const {
TextStats* specific = new TextStats(*this);
@@ -636,10 +636,6 @@ struct TextStats : public SpecificStats {
std::string indexName;
- size_t keysExamined;
-
- size_t fetches;
-
// Human-readable form of the FTSQuery associated with the text stage.
BSONObj parsedTextQuery;
@@ -647,4 +643,26 @@ struct TextStats : public SpecificStats {
BSONObj indexPrefix;
};
+struct TextMatchStats : public SpecificStats {
+ TextMatchStats() : docsRejected(0) {}
+
+ virtual SpecificStats* clone() const {
+ TextMatchStats* specific = new TextMatchStats(*this);
+ return specific;
+ }
+
+ size_t docsRejected;
+};
+
+struct TextOrStats : public SpecificStats {
+ TextOrStats() : fetches(0) {}
+
+ virtual SpecificStats* clone() const {
+ TextOrStats* specific = new TextOrStats(*this);
+ return specific;
+ }
+
+ size_t fetches;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/exec/text.cpp b/src/mongo/db/exec/text.cpp
index 5c4123e29cf..56192851bd9 100644
--- a/src/mongo/db/exec/text.cpp
+++ b/src/mongo/db/exec/text.cpp
@@ -28,14 +28,16 @@
#include "mongo/db/exec/text.h"
-#include "mongo/base/owned_pointer_vector.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
+#include <vector>
+
+#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/filter.h"
#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/text_or.h"
+#include "mongo/db/exec/text_match.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set.h"
-#include "mongo/db/exec/working_set_common.h"
-#include "mongo/db/exec/working_set_computed_data.h"
+#include "mongo/db/fts/fts_index_format.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/stdx/memory.h"
@@ -47,31 +49,27 @@ using std::unique_ptr;
using std::vector;
using stdx::make_unique;
-// static
+using stdx::make_unique;
+
+using fts::FTSIndexFormat;
+using fts::MAX_WEIGHT;
+
const char* TextStage::kStageType = "TEXT";
TextStage::TextStage(OperationContext* txn,
const TextStageParams& params,
WorkingSet* ws,
const MatchExpression* filter)
- : _txn(txn),
- _params(params),
- _ftsMatcher(params.query, params.spec),
- _ws(ws),
- _filter(filter),
- _commonStats(kStageType),
- _internalState(INIT_SCANS),
- _currentIndexScanner(0),
- _idRetrying(WorkingSet::INVALID_ID) {
- _scoreIterator = _scores.end();
+ : _params(params), _textTreeRoot(buildTextTree(txn, ws, filter)), _commonStats(kStageType) {
_specificStats.indexPrefix = _params.indexPrefix;
_specificStats.indexName = _params.index->indexName();
+ _specificStats.parsedTextQuery = _params.query.toBSON();
}
TextStage::~TextStage() {}
bool TextStage::isEOF() {
- return _internalState == DONE;
+ return _textTreeRoot->isEOF();
}
PlanStage::StageState TextStage::work(WorkingSetID* out) {
@@ -83,33 +81,8 @@ PlanStage::StageState TextStage::work(WorkingSetID* out) {
if (isEOF()) {
return PlanStage::IS_EOF;
}
- invariant(_internalState != DONE);
-
- PlanStage::StageState stageState = PlanStage::IS_EOF;
-
- switch (_internalState) {
- case INIT_SCANS:
- try {
- stageState = initScans(out);
- } catch (const WriteConflictException& wce) {
- // Reset and try again next time.
- _internalState = INIT_SCANS;
- _scanners.clear();
- *out = WorkingSet::INVALID_ID;
- stageState = NEED_YIELD;
- }
- break;
- case READING_TERMS:
- stageState = readFromSubScanners(out);
- break;
- case RETURNING_RESULTS:
- stageState = returnResults(out);
- break;
- case DONE:
- // Handled above.
- break;
- }
+ PlanStage::StageState stageState = _textTreeRoot->work(out);
// Increment common stats counters that are specific to the return value of work().
switch (stageState) {
@@ -130,67 +103,33 @@ PlanStage::StageState TextStage::work(WorkingSetID* out) {
}
void TextStage::saveState() {
- _txn = NULL;
++_commonStats.yields;
- for (size_t i = 0; i < _scanners.size(); ++i) {
- _scanners.mutableVector()[i]->saveState();
- }
-
- if (_recordCursor)
- _recordCursor->saveUnpositioned();
+ _textTreeRoot->saveState();
}
void TextStage::restoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
++_commonStats.unyields;
- for (size_t i = 0; i < _scanners.size(); ++i) {
- _scanners.mutableVector()[i]->restoreState(opCtx);
- }
-
- if (_recordCursor)
- invariant(_recordCursor->restore(opCtx));
+ _textTreeRoot->restoreState(opCtx);
}
void TextStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
++_commonStats.invalidates;
- // Propagate invalidate to children.
- for (size_t i = 0; i < _scanners.size(); ++i) {
- _scanners.mutableVector()[i]->invalidate(txn, dl, type);
- }
-
- // We store the score keyed by RecordId. We have to toss out our state when the RecordId
- // changes.
- // TODO: If we're RETURNING_RESULTS we could somehow buffer the object.
- ScoreMap::iterator scoreIt = _scores.find(dl);
- if (scoreIt != _scores.end()) {
- if (scoreIt == _scoreIterator) {
- _scoreIterator++;
- }
- _scores.erase(scoreIt);
- }
+ _textTreeRoot->invalidate(txn, dl, type);
}
vector<PlanStage*> TextStage::getChildren() const {
- vector<PlanStage*> empty;
- return empty;
+ return {_textTreeRoot.get()};
}
unique_ptr<PlanStageStats> TextStage::getStats() {
_commonStats.isEOF = isEOF();
- // Add a BSON representation of the filter to the stats tree, if there is one.
- if (NULL != _filter) {
- BSONObjBuilder bob;
- _filter->toBSON(&bob);
- _commonStats.filter = bob.obj();
- }
-
unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_TEXT);
ret->specific = make_unique<TextStats>(_specificStats);
+ ret->children.push_back(_textTreeRoot->getStats().release());
return ret;
}
@@ -202,289 +141,34 @@ const SpecificStats* TextStage::getSpecificStats() const {
return &_specificStats;
}
-PlanStage::StageState TextStage::initScans(WorkingSetID* out) {
- invariant(0 == _scanners.size());
-
- _recordCursor = _params.index->getCollection()->getCursor(_txn);
-
- _specificStats.parsedTextQuery = _params.query.toBSON();
+unique_ptr<PlanStage> TextStage::buildTextTree(OperationContext* txn,
+ WorkingSet* ws,
+ const MatchExpression* filter) const {
+ auto textScorer = make_unique<TextOrStage>(txn, _params.spec, ws, filter, _params.index);
// Get all the index scans for each term in our query.
- // TODO it would be more efficient to only have one active scan at a time and create the
- // next when each finishes.
- for (std::set<std::string>::const_iterator it = _params.query.getTermsForBounds().begin();
- it != _params.query.getTermsForBounds().end();
- ++it) {
- const string& term = *it;
- IndexScanParams params;
- params.bounds.startKey = FTSIndexFormat::getIndexKey(
+ for (const auto& term : _params.query.getTermsForBounds()) {
+ IndexScanParams ixparams;
+
+ ixparams.bounds.startKey = FTSIndexFormat::getIndexKey(
MAX_WEIGHT, term, _params.indexPrefix, _params.spec.getTextIndexVersion());
- params.bounds.endKey = FTSIndexFormat::getIndexKey(
+ ixparams.bounds.endKey = FTSIndexFormat::getIndexKey(
0, term, _params.indexPrefix, _params.spec.getTextIndexVersion());
- params.bounds.endKeyInclusive = true;
- params.bounds.isSimpleRange = true;
- params.descriptor = _params.index;
- params.direction = -1;
- _scanners.mutableVector().push_back(new IndexScan(_txn, params, _ws, NULL));
- }
-
- // If we have no terms we go right to EOF.
- if (0 == _scanners.size()) {
- _internalState = DONE;
- return PlanStage::IS_EOF;
- }
-
- // Transition to the next state.
- _internalState = READING_TERMS;
- return PlanStage::NEED_TIME;
-}
-
-PlanStage::StageState TextStage::readFromSubScanners(WorkingSetID* out) {
- // This should be checked before we get here.
- invariant(_currentIndexScanner < _scanners.size());
-
- // Either retry the last WSM we worked on or get a new one from our current scanner.
- WorkingSetID id;
- StageState childState;
- if (_idRetrying == WorkingSet::INVALID_ID) {
- childState = _scanners.vector()[_currentIndexScanner]->work(&id);
- } else {
- childState = ADVANCED;
- id = _idRetrying;
- _idRetrying = WorkingSet::INVALID_ID;
- }
-
- if (PlanStage::ADVANCED == childState) {
- return addTerm(id, out);
- } else if (PlanStage::IS_EOF == childState) {
- // Done with this scan.
- ++_currentIndexScanner;
-
- if (_currentIndexScanner < _scanners.size()) {
- // We have another scan to read from.
- return PlanStage::NEED_TIME;
- }
-
- // If we're here we are done reading results. Move to the next state.
- _scoreIterator = _scores.begin();
- _internalState = RETURNING_RESULTS;
-
- // Don't need to keep these around.
- _scanners.clear();
- return PlanStage::NEED_TIME;
- } else {
- // Propagate WSID from below.
- *out = id;
- if (PlanStage::FAILURE == childState) {
- // If a stage fails, it may create a status WSM to indicate why it
- // failed, in which case 'id' is valid. If ID is invalid, we
- // create our own error message.
- if (WorkingSet::INVALID_ID == id) {
- mongoutils::str::stream ss;
- ss << "text stage failed to read in results from child";
- Status status(ErrorCodes::InternalError, ss);
- *out = WorkingSetCommon::allocateStatusMember(_ws, status);
- }
- }
- return childState;
- }
-}
-
-PlanStage::StageState TextStage::returnResults(WorkingSetID* out) {
- if (_scoreIterator == _scores.end()) {
- _internalState = DONE;
- return PlanStage::IS_EOF;
- }
-
- // Filter for phrases and negative terms, score and truncate.
- TextRecordData textRecordData = _scoreIterator->second;
-
- // Ignore non-matched documents.
- if (textRecordData.score < 0) {
- _scoreIterator++;
- invariant(textRecordData.wsid == WorkingSet::INVALID_ID);
- return PlanStage::NEED_TIME;
- }
-
- WorkingSetMember* wsm = _ws->get(textRecordData.wsid);
- try {
- if (!WorkingSetCommon::fetchIfUnfetched(_txn, _ws, textRecordData.wsid, _recordCursor)) {
- _scoreIterator++;
- _ws->free(textRecordData.wsid);
- _commonStats.needTime++;
- return NEED_TIME;
- }
- } catch (const WriteConflictException& wce) {
- // Do this record again next time around.
- *out = WorkingSet::INVALID_ID;
- _commonStats.needYield++;
- return NEED_YIELD;
- }
-
- _scoreIterator++;
-
- // Filter for phrases and negated terms
- if (!_ftsMatcher.matches(wsm->obj.value())) {
- _ws->free(textRecordData.wsid);
- return PlanStage::NEED_TIME;
- }
-
- // Populate the working set member with the text score and return it.
- wsm->addComputed(new TextScoreComputedData(textRecordData.score));
- *out = textRecordData.wsid;
- return PlanStage::ADVANCED;
-}
-
-class TextMatchableDocument : public MatchableDocument {
-public:
- TextMatchableDocument(OperationContext* txn,
- const BSONObj& keyPattern,
- const BSONObj& key,
- WorkingSet* ws,
- WorkingSetID id,
- unowned_ptr<RecordCursor> recordCursor)
- : _txn(txn),
- _recordCursor(recordCursor),
- _keyPattern(keyPattern),
- _key(key),
- _ws(ws),
- _id(id) {}
-
- BSONObj toBSON() const {
- return getObj();
- }
-
- virtual ElementIterator* allocateIterator(const ElementPath* path) const {
- WorkingSetMember* member = _ws->get(_id);
- if (!member->hasObj()) {
- // Try to look in the key.
- BSONObjIterator keyPatternIt(_keyPattern);
- BSONObjIterator keyDataIt(_key);
-
- while (keyPatternIt.more()) {
- BSONElement keyPatternElt = keyPatternIt.next();
- verify(keyDataIt.more());
- BSONElement keyDataElt = keyDataIt.next();
-
- if (path->fieldRef().equalsDottedField(keyPatternElt.fieldName())) {
- if (Array == keyDataElt.type()) {
- return new SimpleArrayElementIterator(keyDataElt, true);
- } else {
- return new SingleElementElementIterator(keyDataElt);
- }
- }
- }
- }
-
- // Go to the raw document, fetching if needed.
- return new BSONElementIterator(path, getObj());
- }
-
- virtual void releaseIterator(ElementIterator* iterator) const {
- delete iterator;
- }
-
- // Thrown if we detect that the document being matched was deleted.
- class DocumentDeletedException {};
-
-private:
- BSONObj getObj() const {
- if (!WorkingSetCommon::fetchIfUnfetched(_txn, _ws, _id, _recordCursor))
- throw DocumentDeletedException();
-
- WorkingSetMember* member = _ws->get(_id);
- // Make it owned since we are buffering results.
- member->obj.setValue(member->obj.value().getOwned());
- return member->obj.value();
- }
-
- OperationContext* _txn;
- unowned_ptr<RecordCursor> _recordCursor;
- BSONObj _keyPattern;
- BSONObj _key;
- WorkingSet* _ws;
- WorkingSetID _id;
-};
-
-PlanStage::StageState TextStage::addTerm(WorkingSetID wsid, WorkingSetID* out) {
- WorkingSetMember* wsm = _ws->get(wsid);
- invariant(wsm->getState() == WorkingSetMember::LOC_AND_IDX);
- invariant(1 == wsm->keyData.size());
- const IndexKeyDatum newKeyData = wsm->keyData.back(); // copy to keep it around.
-
- TextRecordData* textRecordData = &_scores[wsm->loc];
- double* documentAggregateScore = &textRecordData->score;
-
- if (WorkingSet::INVALID_ID == textRecordData->wsid) {
- // We haven't seen this RecordId before. Keep the working set member around
- // (it may be force-fetched on saveState()).
- textRecordData->wsid = wsid;
-
- if (_filter) {
- // We have not seen this document before and need to apply a filter.
- bool shouldKeep;
- bool wasDeleted = false;
- try {
- TextMatchableDocument tdoc(
- _txn, newKeyData.indexKeyPattern, newKeyData.keyData, _ws, wsid, _recordCursor);
- shouldKeep = _filter->matches(&tdoc);
- } catch (const WriteConflictException& wce) {
- _idRetrying = wsid;
- *out = WorkingSet::INVALID_ID;
- return NEED_YIELD;
- } catch (const TextMatchableDocument::DocumentDeletedException&) {
- // We attempted to fetch the document but decided it should be excluded from the
- // result set.
- shouldKeep = false;
- wasDeleted = true;
- }
-
- if (!shouldKeep) {
- if (wasDeleted || wsm->hasObj()) {
- // We had to fetch but we're not going to return it.
- ++_specificStats.fetches;
- }
- _ws->free(textRecordData->wsid);
- textRecordData->wsid = WorkingSet::INVALID_ID;
- *documentAggregateScore = -1;
- return NEED_TIME;
- }
- } else {
- // If we're here, we're going to return the doc, and we do a fetch later.
- ++_specificStats.fetches;
- }
- } else {
- // We already have a working set member for this RecordId. Free the new
- // WSM and retrieve the old one.
- // Note that since we don't keep all index keys, we could get a score that doesn't match
- // the document, but this has always been a problem.
- // TODO something to improve the situation.
- invariant(wsid != textRecordData->wsid);
- _ws->free(wsid);
- wsm = _ws->get(textRecordData->wsid);
- }
-
- ++_specificStats.keysExamined;
-
- if (*documentAggregateScore < 0) {
- // We have already rejected this document for not matching the filter.
- return NEED_TIME;
- }
+ ixparams.bounds.endKeyInclusive = true;
+ ixparams.bounds.isSimpleRange = true;
+ ixparams.descriptor = _params.index;
+ ixparams.direction = -1;
- // Locate score within possibly compound key: {prefix,term,score,suffix}.
- BSONObjIterator keyIt(newKeyData.keyData);
- for (unsigned i = 0; i < _params.spec.numExtraBefore(); i++) {
- keyIt.next();
+ textScorer->addChild(make_unique<IndexScan>(txn, ixparams, ws, nullptr));
}
- keyIt.next(); // Skip past 'term'.
+ auto fetcher = make_unique<FetchStage>(
+ txn, ws, textScorer.release(), nullptr, _params.index->getCollection());
- BSONElement scoreElement = keyIt.next();
- double documentTermScore = scoreElement.number();
+ auto matcher = make_unique<TextMatchStage>(std::move(fetcher), _params.query, _params.spec, ws);
- // Aggregate relevance score, term keys.
- *documentAggregateScore += documentTermScore;
- return NEED_TIME;
+ unique_ptr<PlanStage> treeRoot = std::move(matcher);
+ return treeRoot;
}
} // namespace mongo
diff --git a/src/mongo/db/exec/text.h b/src/mongo/db/exec/text.h
index f28b575570d..27b5bc62231 100644
--- a/src/mongo/db/exec/text.h
+++ b/src/mongo/db/exec/text.h
@@ -28,29 +28,23 @@
#pragma once
+#include <memory>
+#include <vector>
+
#include "mongo/db/exec/plan_stage.h"
-#include "mongo/db/fts/fts_index_format.h"
-#include "mongo/db/fts/fts_matcher.h"
+#include "mongo/db/exec/working_set.h"
#include "mongo/db/fts/fts_query.h"
#include "mongo/db/fts/fts_spec.h"
#include "mongo/db/fts/fts_util.h"
#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/db/matcher/expression.h"
-#include "mongo/db/record_id.h"
-#include "mongo/platform/unordered_map.h"
-
-#include <map>
-#include <queue>
-#include <vector>
namespace mongo {
-using fts::FTSIndexFormat;
-using fts::FTSMatcher;
+using std::unique_ptr;
+using std::vector;
+
using fts::FTSQuery;
using fts::FTSSpec;
-using fts::MAX_WEIGHT;
class OperationContext;
@@ -73,132 +67,55 @@ struct TextStageParams {
/**
* Implements a blocking stage that returns text search results.
*
- * Prerequisites: None; is a leaf node.
- * Output type: LOC_AND_OBJ_UNOWNED.
- *
- * TODO: Should the TextStage ever generate NEED_YIELD requests for fetching MMAP v1 records?
- * Right now this stage could reduce concurrency by failing to request a yield during fetch.
+ * Output type: LOC_AND_OBJ.
*/
class TextStage : public PlanStage {
public:
- /**
- * The text stage has a few 'states' it transitions between.
- */
- enum State {
- // 1. Initialize the index scans we use to retrieve term/score info.
- INIT_SCANS,
-
- // 2. Read the terms/scores from the text index.
- READING_TERMS,
-
- // 3. Return results to our parent.
- RETURNING_RESULTS,
-
- // 4. Done.
- DONE,
- };
-
TextStage(OperationContext* txn,
const TextStageParams& params,
WorkingSet* ws,
const MatchExpression* filter);
- virtual ~TextStage();
+ ~TextStage() final;
- virtual StageState work(WorkingSetID* out);
- virtual bool isEOF();
+ StageState work(WorkingSetID* out) final;
+ bool isEOF() final;
- virtual void saveState();
- virtual void restoreState(OperationContext* opCtx);
- virtual void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
+ void saveState() final;
+ void restoreState(OperationContext* opCtx) final;
+ void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final;
- virtual std::vector<PlanStage*> getChildren() const;
+ vector<PlanStage*> getChildren() const;
- virtual StageType stageType() const {
+ StageType stageType() const final {
return STAGE_TEXT;
}
std::unique_ptr<PlanStageStats> getStats();
- virtual const CommonStats* getCommonStats() const;
+ const CommonStats* getCommonStats() const final;
- virtual const SpecificStats* getSpecificStats() const;
+ const SpecificStats* getSpecificStats() const final;
static const char* kStageType;
private:
/**
- * Initializes sub-scanners.
+ * Helper method to built the query execution plan for the text stage.
*/
- StageState initScans(WorkingSetID* out);
-
- /**
- * Helper for buffering results array. Returns NEED_TIME (if any results were produced),
- * IS_EOF, or FAILURE.
- */
- StageState readFromSubScanners(WorkingSetID* out);
-
- /**
- * Helper called from readFromSubScanners to update aggregate score with a new-found (term,
- * score) pair for this document. Also rejects documents that don't match this stage's
- * filter.
- */
- StageState addTerm(WorkingSetID wsid, WorkingSetID* out);
-
- /**
- * Possibly return a result. FYI, this may perform a fetch directly if it is needed to
- * evaluate all filters.
- */
- StageState returnResults(WorkingSetID* out);
-
- // transactional context for read locks. Not owned by us
- OperationContext* _txn;
+ unique_ptr<PlanStage> buildTextTree(OperationContext* txn,
+ WorkingSet* ws,
+ const MatchExpression* filter) const;
// Parameters of this text stage.
TextStageParams _params;
- // Text-specific phrase and negated term matcher.
- FTSMatcher _ftsMatcher;
-
- // Working set. Not owned by us.
- WorkingSet* _ws;
-
- // Filter. Not owned by us.
- const MatchExpression* _filter;
+ // The root of the text query tree.
+ unique_ptr<PlanStage> _textTreeRoot;
// Stats.
CommonStats _commonStats;
TextStats _specificStats;
-
- // What state are we in? See the State enum above.
- State _internalState;
-
- // Used in INIT_SCANS and READING_TERMS. The index scans we're using to retrieve text
- // terms.
- OwnedPointerVector<PlanStage> _scanners;
-
- // Which _scanners are we currently reading from?
- size_t _currentIndexScanner;
-
- // If not Null, we use this rather than asking our child what to do next.
- WorkingSetID _idRetrying;
-
- // Map each buffered record id to this data.
- struct TextRecordData {
- TextRecordData() : wsid(WorkingSet::INVALID_ID), score(0.0) {}
- WorkingSetID wsid;
- double score;
- };
-
- // Temporary score data filled out by sub-scans. Used in READING_TERMS and
- // RETURNING_RESULTS.
- // Maps from diskloc -> (aggregate score for doc, wsid).
- typedef unordered_map<RecordId, TextRecordData, RecordId::Hasher> ScoreMap;
- ScoreMap _scores;
- ScoreMap::const_iterator _scoreIterator;
-
- // Used for fetching records from the collection.
- std::unique_ptr<RecordCursor> _recordCursor;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/text_match.cpp b/src/mongo/db/exec/text_match.cpp
new file mode 100644
index 00000000000..8edb78b2a3e
--- /dev/null
+++ b/src/mongo/db/exec/text_match.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/text_match.h"
+
+#include <vector>
+
+#include "mongo/db/exec/scoped_timer.h"
+#include "mongo/db/exec/working_set.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+
+using std::unique_ptr;
+using std::vector;
+using stdx::make_unique;
+
+const char* TextMatchStage::kStageType = "TEXT_MATCH";
+
+TextMatchStage::TextMatchStage(unique_ptr<PlanStage> child,
+ const FTSQuery& query,
+ const FTSSpec& spec,
+ WorkingSet* ws)
+ : _ftsMatcher(query, spec), _ws(ws), _child(std::move(child)), _commonStats(kStageType) {}
+
+TextMatchStage::~TextMatchStage() {}
+
+bool TextMatchStage::isEOF() {
+ return _child->isEOF();
+}
+
+void TextMatchStage::saveState() {
+ ++_commonStats.yields;
+
+ _child->saveState();
+}
+
+void TextMatchStage::restoreState(OperationContext* opCtx) {
+ ++_commonStats.unyields;
+
+ _child->restoreState(opCtx);
+}
+
+void TextMatchStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ ++_commonStats.invalidates;
+
+ _child->invalidate(txn, dl, type);
+}
+
+vector<PlanStage*> TextMatchStage::getChildren() const {
+ return {_child.get()};
+}
+
+std::unique_ptr<PlanStageStats> TextMatchStage::getStats() {
+ _commonStats.isEOF = isEOF();
+
+ unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_TEXT_MATCH);
+ ret->specific = make_unique<TextMatchStats>(_specificStats);
+ ret->children.push_back(_child->getStats().release());
+
+ return ret;
+}
+
+const CommonStats* TextMatchStage::getCommonStats() const {
+ return &_commonStats;
+}
+
+const SpecificStats* TextMatchStage::getSpecificStats() const {
+ return &_specificStats;
+}
+
+PlanStage::StageState TextMatchStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
+
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
+
+ // Retrieve fetched document from child.
+ StageState stageState = _child->work(out);
+
+ if (stageState == PlanStage::ADVANCED) {
+ // We just successfully retrieved a fetched doc.
+ WorkingSetMember* wsm = _ws->get(*out);
+
+ // Filter for phrases and negated terms.
+ if (!_ftsMatcher.matches(wsm->obj.value())) {
+ _ws->free(*out);
+ *out = WorkingSet::INVALID_ID;
+ ++_specificStats.docsRejected;
+ stageState = PlanStage::NEED_TIME;
+ }
+ } else if (stageState == PlanStage::FAILURE) {
+ // If a stage fails, it may create a status WSM to indicate why it
+ // failed, in which case '*out' is valid. If ID is invalid, we
+ // create our own error message.
+ if (WorkingSet::INVALID_ID == *out) {
+ str::stream ss;
+ ss << "TEXT_MATCH stage failed to read in results from child";
+ Status status(ErrorCodes::InternalError, ss);
+ *out = WorkingSetCommon::allocateStatusMember(_ws, status);
+ }
+ }
+
+ // Increment common stats counters that are specific to the return value of work().
+ switch (stageState) {
+ case PlanStage::ADVANCED:
+ ++_commonStats.advanced;
+ break;
+ case PlanStage::NEED_TIME:
+ ++_commonStats.needTime;
+ break;
+ case PlanStage::NEED_YIELD:
+ ++_commonStats.needYield;
+ break;
+ default:
+ break;
+ }
+
+ return stageState;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/text_match.h b/src/mongo/db/exec/text_match.h
new file mode 100644
index 00000000000..bc1cf6b528f
--- /dev/null
+++ b/src/mongo/db/exec/text_match.h
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/working_set.h"
+#include "mongo/db/fts/fts_matcher.h"
+#include "mongo/db/fts/fts_query.h"
+#include "mongo/db/fts/fts_spec.h"
+
+namespace mongo {
+
+using std::unique_ptr;
+
+using fts::FTSMatcher;
+using fts::FTSQuery;
+using fts::FTSSpec;
+
+
+class OperationContext;
+class RecordID;
+
+/**
+ * A stage that returns every document in the child that satisfies the FTS text matcher built with
+ * the query parameter.
+ *
+ * Prerequisites: A single child stage that passes up WorkingSetMembers in the LOC_AND_OBJ state,
+ * with associated text scores.
+ */
+class TextMatchStage final : public PlanStage {
+public:
+ TextMatchStage(unique_ptr<PlanStage> child,
+ const FTSQuery& query,
+ const FTSSpec& spec,
+ WorkingSet* ws);
+ ~TextMatchStage() final;
+
+ void addChild(PlanStage* child);
+
+ bool isEOF() final;
+
+ StageState work(WorkingSetID* out) final;
+
+ void saveState() final;
+ void restoreState(OperationContext* opCtx) final;
+ void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final;
+
+ std::vector<PlanStage*> getChildren() const final;
+
+ StageType stageType() const final {
+ return STAGE_TEXT_MATCH;
+ }
+
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ const CommonStats* getCommonStats() const final;
+
+ const SpecificStats* getSpecificStats() const final;
+
+ static const char* kStageType;
+
+private:
+ // Text-specific phrase and negated term matcher.
+ FTSMatcher _ftsMatcher;
+
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ // The child PlanStage that provides the RecordIDs and scores for text matching.
+ unique_ptr<PlanStage> _child;
+
+ // Stats
+ CommonStats _commonStats;
+ TextMatchStats _specificStats;
+};
+} // namespace mongo
diff --git a/src/mongo/db/exec/text_or.cpp b/src/mongo/db/exec/text_or.cpp
new file mode 100644
index 00000000000..be600e3639d
--- /dev/null
+++ b/src/mongo/db/exec/text_or.cpp
@@ -0,0 +1,450 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/text_or.h"
+
+#include <map>
+#include <vector>
+
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/scoped_timer.h"
+#include "mongo/db/exec/working_set.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/exec/working_set_computed_data.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher/matchable.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/record_id.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+
+using std::unique_ptr;
+using std::vector;
+using std::string;
+using stdx::make_unique;
+
+using fts::FTSSpec;
+
+const char* TextOrStage::kStageType = "TEXT_OR";
+
+TextOrStage::TextOrStage(OperationContext* txn,
+ const FTSSpec& ftsSpec,
+ WorkingSet* ws,
+ const MatchExpression* filter,
+ IndexDescriptor* index)
+ : _ftsSpec(ftsSpec),
+ _ws(ws),
+ _scoreIterator(_scores.end()),
+ _commonStats(kStageType),
+ _filter(filter),
+ _txn(txn),
+ _idRetrying(WorkingSet::INVALID_ID),
+ _index(index) {}
+
+TextOrStage::~TextOrStage() {}
+
+void TextOrStage::addChild(unique_ptr<PlanStage> child) {
+ _children.push_back(std::move(child));
+}
+
+bool TextOrStage::isEOF() {
+ return _internalState == State::kDone;
+}
+
+void TextOrStage::saveState() {
+ _txn = NULL;
+ ++_commonStats.yields;
+
+ for (auto& child : _children) {
+ child->saveState();
+ }
+
+ if (_recordCursor) {
+ _recordCursor->saveUnpositioned();
+ }
+}
+
+void TextOrStage::restoreState(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ ++_commonStats.unyields;
+
+ for (auto& child : _children) {
+ child->restoreState(opCtx);
+ }
+
+ if (_recordCursor) {
+ invariant(_recordCursor->restore(opCtx));
+ }
+}
+
+void TextOrStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
+ ++_commonStats.invalidates;
+
+ // Propagate invalidate to children.
+ for (auto& child : _children) {
+ child->invalidate(txn, dl, type);
+ }
+
+ // Remove the RecordID from the ScoreMap.
+ ScoreMap::iterator scoreIt = _scores.find(dl);
+ if (scoreIt != _scores.end()) {
+ if (scoreIt == _scoreIterator) {
+ _scoreIterator++;
+ }
+ _scores.erase(scoreIt);
+ }
+}
+
+vector<PlanStage*> TextOrStage::getChildren() const {
+ std::vector<PlanStage*> vec;
+ for (auto& child : _children) {
+ vec.push_back(child.get());
+ }
+ return vec;
+}
+
+std::unique_ptr<PlanStageStats> TextOrStage::getStats() {
+ _commonStats.isEOF = isEOF();
+
+ if (_filter) {
+ BSONObjBuilder bob;
+ _filter->toBSON(&bob);
+ _commonStats.filter = bob.obj();
+ }
+
+ unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_TEXT_OR);
+ ret->specific = make_unique<TextOrStats>(_specificStats);
+
+ for (auto& child : _children) {
+ ret->children.push_back(child->getStats().release());
+ }
+
+ return ret;
+}
+
+const CommonStats* TextOrStage::getCommonStats() const {
+ return &_commonStats;
+}
+
+const SpecificStats* TextOrStage::getSpecificStats() const {
+ return &_specificStats;
+}
+
+PlanStage::StageState TextOrStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
+
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
+
+ PlanStage::StageState stageState = PlanStage::IS_EOF;
+
+ switch (_internalState) {
+ case State::kInit:
+ stageState = initStage(out);
+ break;
+ case State::kReadingTerms:
+ stageState = readFromChildren(out);
+ break;
+ case State::kReturningResults:
+ stageState = returnResults(out);
+ break;
+ case State::kDone:
+ // Should have been handled above.
+ invariant(false);
+ break;
+ }
+
+ // Increment common stats counters.
+ switch (stageState) {
+ case PlanStage::ADVANCED:
+ ++_commonStats.advanced;
+ break;
+ case PlanStage::NEED_TIME:
+ ++_commonStats.needTime;
+ break;
+ case PlanStage::NEED_YIELD:
+ ++_commonStats.needYield;
+ break;
+ default:
+ break;
+ }
+
+ return stageState;
+}
+
+PlanStage::StageState TextOrStage::initStage(WorkingSetID* out) {
+ *out = WorkingSet::INVALID_ID;
+ try {
+ _recordCursor = _index->getCollection()->getCursor(_txn);
+ _internalState = State::kReadingTerms;
+ return PlanStage::NEED_TIME;
+ } catch (const WriteConflictException& wce) {
+ invariant(_internalState == State::kInit);
+ _recordCursor.reset();
+ return PlanStage::NEED_YIELD;
+ }
+}
+
+PlanStage::StageState TextOrStage::readFromChildren(WorkingSetID* out) {
+ // Check to see if there were any children added in the first place.
+ if (_children.size() == 0) {
+ _internalState = State::kDone;
+ return PlanStage::IS_EOF;
+ }
+ invariant(_currentChild < _children.size());
+
+ // Either retry the last WSM we worked on or get a new one from our current child.
+ WorkingSetID id;
+ StageState childState;
+ if (_idRetrying == WorkingSet::INVALID_ID) {
+ childState = _children[_currentChild]->work(&id);
+ } else {
+ childState = ADVANCED;
+ id = _idRetrying;
+ _idRetrying = WorkingSet::INVALID_ID;
+ }
+
+ if (PlanStage::ADVANCED == childState) {
+ return addTerm(id, out);
+ } else if (PlanStage::IS_EOF == childState) {
+ // Done with this child.
+ ++_currentChild;
+
+ if (_currentChild < _children.size()) {
+ // We have another child to read from.
+ return PlanStage::NEED_TIME;
+ }
+
+ // If we're here we are done reading results. Move to the next state.
+ _scoreIterator = _scores.begin();
+ _internalState = State::kReturningResults;
+
+ return PlanStage::NEED_TIME;
+ } else if (PlanStage::FAILURE == childState) {
+ // If a stage fails, it may create a status WSM to indicate why it
+ // failed, in which case 'id' is valid. If ID is invalid, we
+ // create our own error message.
+ if (WorkingSet::INVALID_ID == id) {
+ mongoutils::str::stream ss;
+ ss << "TEXT_OR stage failed to read in results from child";
+ Status status(ErrorCodes::InternalError, ss);
+ *out = WorkingSetCommon::allocateStatusMember(_ws, status);
+ } else {
+ *out = id;
+ }
+ return PlanStage::FAILURE;
+ } else {
+ // Propagate WSID from below.
+ *out = id;
+ return childState;
+ }
+}
+
+PlanStage::StageState TextOrStage::returnResults(WorkingSetID* out) {
+ if (_scoreIterator == _scores.end()) {
+ _internalState = State::kDone;
+ return PlanStage::IS_EOF;
+ }
+
+ // Retrieve the record that contains the text score.
+ TextRecordData textRecordData = _scoreIterator->second;
+ ++_scoreIterator;
+
+ // Ignore non-matched documents.
+ if (textRecordData.score < 0) {
+ invariant(textRecordData.wsid == WorkingSet::INVALID_ID);
+ return PlanStage::NEED_TIME;
+ }
+
+ WorkingSetMember* wsm = _ws->get(textRecordData.wsid);
+
+ // Populate the working set member with the text score and return it.
+ wsm->addComputed(new TextScoreComputedData(textRecordData.score));
+ *out = textRecordData.wsid;
+ return PlanStage::ADVANCED;
+}
+
+/**
+ * Provides support for covered matching on non-text fields of a compound text index.
+ */
+class TextMatchableDocument : public MatchableDocument {
+public:
+ TextMatchableDocument(OperationContext* txn,
+ const BSONObj& keyPattern,
+ const BSONObj& key,
+ WorkingSet* ws,
+ WorkingSetID id,
+ unowned_ptr<RecordCursor> recordCursor)
+ : _txn(txn),
+ _recordCursor(recordCursor),
+ _keyPattern(keyPattern),
+ _key(key),
+ _ws(ws),
+ _id(id) {}
+
+ BSONObj toBSON() const {
+ return getObj();
+ }
+
+ virtual ElementIterator* allocateIterator(const ElementPath* path) const {
+ WorkingSetMember* member = _ws->get(_id);
+ if (!member->hasObj()) {
+ // Try to look in the key.
+ BSONObjIterator keyPatternIt(_keyPattern);
+ BSONObjIterator keyDataIt(_key);
+
+ while (keyPatternIt.more()) {
+ BSONElement keyPatternElt = keyPatternIt.next();
+ verify(keyDataIt.more());
+ BSONElement keyDataElt = keyDataIt.next();
+
+ if (path->fieldRef().equalsDottedField(keyPatternElt.fieldName())) {
+ if (Array == keyDataElt.type()) {
+ return new SimpleArrayElementIterator(keyDataElt, true);
+ } else {
+ return new SingleElementElementIterator(keyDataElt);
+ }
+ }
+ }
+ }
+
+ // Go to the raw document, fetching if needed.
+ return new BSONElementIterator(path, getObj());
+ }
+
+ virtual void releaseIterator(ElementIterator* iterator) const {
+ delete iterator;
+ }
+
+ // Thrown if we detect that the document being matched was deleted.
+ class DocumentDeletedException {};
+
+private:
+ BSONObj getObj() const {
+ if (!WorkingSetCommon::fetchIfUnfetched(_txn, _ws, _id, _recordCursor))
+ throw DocumentDeletedException();
+
+ WorkingSetMember* member = _ws->get(_id);
+
+ // Make it owned since we are buffering results.
+ member->obj.setValue(member->obj.value().getOwned());
+ return member->obj.value();
+ }
+
+ OperationContext* _txn;
+ unowned_ptr<RecordCursor> _recordCursor;
+ BSONObj _keyPattern;
+ BSONObj _key;
+ WorkingSet* _ws;
+ WorkingSetID _id;
+};
+
+PlanStage::StageState TextOrStage::addTerm(WorkingSetID wsid, WorkingSetID* out) {
+ WorkingSetMember* wsm = _ws->get(wsid);
+ invariant(wsm->getState() == WorkingSetMember::LOC_AND_IDX);
+ invariant(1 == wsm->keyData.size());
+ const IndexKeyDatum newKeyData = wsm->keyData.back(); // copy to keep it around.
+
+ TextRecordData* textRecordData = &_scores[wsm->loc];
+ double* documentAggregateScore = &textRecordData->score;
+
+ if (WorkingSet::INVALID_ID == textRecordData->wsid) {
+ // We haven't seen this RecordId before. Keep the working set member around (it may be
+ // force-fetched on saveState()).
+ textRecordData->wsid = wsid;
+
+ if (_filter) {
+ // We have not seen this document before and need to apply a filter.
+ bool shouldKeep;
+ bool wasDeleted = false;
+ try {
+ TextMatchableDocument tdoc(
+ _txn, newKeyData.indexKeyPattern, newKeyData.keyData, _ws, wsid, _recordCursor);
+ shouldKeep = _filter->matches(&tdoc);
+ } catch (const WriteConflictException& wce) {
+ _idRetrying = wsid;
+ *out = WorkingSet::INVALID_ID;
+ return NEED_YIELD;
+ } catch (const TextMatchableDocument::DocumentDeletedException&) {
+ // We attempted to fetch the document but decided it should be excluded from the
+ // result set.
+ shouldKeep = false;
+ wasDeleted = true;
+ }
+
+ if (!shouldKeep) {
+ if (wasDeleted || wsm->hasObj()) {
+ // We had to fetch but we're not going to return it.
+ ++_specificStats.fetches;
+ }
+ _ws->free(textRecordData->wsid);
+ textRecordData->wsid = WorkingSet::INVALID_ID;
+ *documentAggregateScore = -1;
+ return NEED_TIME;
+ }
+ }
+
+ } else {
+ // We already have a working set member for this RecordId. Free the new WSM and retrieve the
+ // old one. Note that since we don't keep all index keys, we could get a score that doesn't
+ // match the document, but this has always been a problem.
+ // TODO something to improve the situation.
+ invariant(wsid != textRecordData->wsid);
+ _ws->free(wsid);
+ wsm = _ws->get(textRecordData->wsid);
+ }
+
+ if (*documentAggregateScore < 0) {
+ // We have already rejected this document for not matching the filter.
+ return NEED_TIME;
+ }
+
+ // Locate score within possibly compound key: {prefix,term,score,suffix}.
+ BSONObjIterator keyIt(newKeyData.keyData);
+ for (unsigned i = 0; i < _ftsSpec.numExtraBefore(); i++) {
+ keyIt.next();
+ }
+
+ keyIt.next(); // Skip past 'term'.
+
+ BSONElement scoreElement = keyIt.next();
+ double documentTermScore = scoreElement.number();
+
+ // Aggregate relevance score, term keys.
+ *documentAggregateScore += documentTermScore;
+ return NEED_TIME;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/text_or.h b/src/mongo/db/exec/text_or.h
new file mode 100644
index 00000000000..4de8b5bade2
--- /dev/null
+++ b/src/mongo/db/exec/text_or.h
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/fts/fts_spec.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/matcher/expression.h"
+#include "mongo/db/record_id.h"
+
+namespace mongo {
+
+using std::unique_ptr;
+using std::vector;
+
+using fts::FTSSpec;
+
+class OperationContext;
+
+/**
+ * A blocking stage that returns the set of WSMs with RecordIDs of all of the documents that contain
+ * the positive terms in the search query, as well as their scores.
+ *
+ * The WorkingSetMembers returned are in the LOC_AND_IDX state. If a filter is passed in, some
+ * WorkingSetMembers may be returned in the LOC_AND_OBJ state.
+ */
+class TextOrStage final : public PlanStage {
+public:
+ /**
+ * Internal states.
+ */
+ enum class State {
+ // 1. Initialize the _recordCursor.
+ kInit,
+
+ // 2. Read the terms/scores from the text index.
+ kReadingTerms,
+
+ // 3. Return results to our parent.
+ kReturningResults,
+
+ // 4. Finished.
+ kDone,
+ };
+
+ TextOrStage(OperationContext* txn,
+ const FTSSpec& ftsSpec,
+ WorkingSet* ws,
+ const MatchExpression* filter,
+ IndexDescriptor* index);
+ ~TextOrStage() final;
+
+ void addChild(unique_ptr<PlanStage> child);
+
+ bool isEOF() final;
+
+ StageState work(WorkingSetID* out) final;
+
+ void saveState() final;
+ void restoreState(OperationContext* opCtx) final;
+ void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final;
+
+ std::vector<PlanStage*> getChildren() const final;
+
+ StageType stageType() const final {
+ return STAGE_TEXT_OR;
+ }
+
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ const CommonStats* getCommonStats() const final;
+
+ const SpecificStats* getSpecificStats() const final;
+
+ static const char* kStageType;
+
+private:
+ /**
+ * Worker for kInit. Initializes the _recordCursor member and handles the potential for
+ * getCursor() to throw WriteConflictException.
+ */
+ StageState initStage(WorkingSetID* out);
+
+ /**
+ * Worker for kReadingTerms. Reads from the children, searching for the terms in the query and
+ * populates the score map.
+ */
+ StageState readFromChildren(WorkingSetID* out);
+
+ /**
+ * Helper called from readFromChildren to update aggregate score with a newfound (term, score)
+ * pair for this document.
+ */
+ StageState addTerm(WorkingSetID wsid, WorkingSetID* out);
+
+ /**
+ * Worker for kReturningResults. Returns a wsm with RecordID and Score.
+ */
+ StageState returnResults(WorkingSetID* out);
+
+ // The index spec used to determine where to find the score.
+ FTSSpec _ftsSpec;
+
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ // What state are we in? See the State enum above.
+ State _internalState = State::kInit;
+
+ // Children owned by us.
+ vector<unique_ptr<PlanStage>> _children;
+
+ // Which of _children are we calling work(...) on now?
+ size_t _currentChild = 0;
+
+ /**
+ * Temporary score data filled out by children.
+ * Maps from RecordID -> (aggregate score for doc, wsid).
+ * Map each buffered record id to this data.
+ */
+ struct TextRecordData {
+ TextRecordData() : wsid(WorkingSet::INVALID_ID), score(0.0) {}
+ WorkingSetID wsid;
+ double score;
+ };
+
+ typedef unordered_map<RecordId, TextRecordData, RecordId::Hasher> ScoreMap;
+ ScoreMap _scores;
+ ScoreMap::const_iterator _scoreIterator;
+
+ // Stats
+ CommonStats _commonStats;
+ TextOrStats _specificStats;
+
+ // Members needed only for using the TextMatchableDocument.
+ const MatchExpression* _filter;
+ OperationContext* _txn;
+ WorkingSetID _idRetrying;
+ std::unique_ptr<RecordCursor> _recordCursor;
+ IndexDescriptor* _index;
+};
+}
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index 7b1cd331a88..28aa656fbe5 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -109,9 +109,6 @@ size_t getKeysExamined(StageType type, const SpecificStats* specific) {
} else if (STAGE_IDHACK == type) {
const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
return spec->keysExamined;
- } else if (STAGE_TEXT == type) {
- const TextStats* spec = static_cast<const TextStats*>(specific);
- return spec->keysExamined;
} else if (STAGE_COUNT_SCAN == type) {
const CountScanStats* spec = static_cast<const CountScanStats*>(specific);
return spec->keysExamined;
@@ -133,18 +130,18 @@ size_t getKeysExamined(StageType type, const SpecificStats* specific) {
* (in which case this gets called from Explain::getSummaryStats()).
*/
size_t getDocsExamined(StageType type, const SpecificStats* specific) {
- if (STAGE_IDHACK == type) {
- const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
- return spec->docsExamined;
- } else if (STAGE_TEXT == type) {
- const TextStats* spec = static_cast<const TextStats*>(specific);
- return spec->fetches;
+ if (STAGE_COLLSCAN == type) {
+ const CollectionScanStats* spec = static_cast<const CollectionScanStats*>(specific);
+ return spec->docsTested;
} else if (STAGE_FETCH == type) {
const FetchStats* spec = static_cast<const FetchStats*>(specific);
return spec->docsExamined;
- } else if (STAGE_COLLSCAN == type) {
- const CollectionScanStats* spec = static_cast<const CollectionScanStats*>(specific);
- return spec->docsTested;
+ } else if (STAGE_IDHACK == type) {
+ const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
+ return spec->docsExamined;
+ } else if (STAGE_TEXT_OR == type) {
+ const TextOrStats* spec = static_cast<const TextOrStats*>(specific);
+ return spec->fetches;
}
return 0;
@@ -387,14 +384,21 @@ void Explain::statsToBSON(const PlanStageStats& stats,
} else if (STAGE_TEXT == stats.stageType) {
TextStats* spec = static_cast<TextStats*>(stats.specific.get());
- if (verbosity >= ExplainCommon::EXEC_STATS) {
- bob->appendNumber("keysExamined", spec->keysExamined);
- bob->appendNumber("docsExamined", spec->fetches);
- }
-
bob->append("indexPrefix", spec->indexPrefix);
bob->append("indexName", spec->indexName);
bob->append("parsedTextQuery", spec->parsedTextQuery);
+ } else if (STAGE_TEXT_MATCH == stats.stageType) {
+ TextMatchStats* spec = static_cast<TextMatchStats*>(stats.specific.get());
+
+ if (verbosity >= ExplainCommon::EXEC_STATS) {
+ bob->appendNumber("docsRejected", spec->docsRejected);
+ }
+ } else if (STAGE_TEXT_OR == stats.stageType) {
+ TextOrStats* spec = static_cast<TextOrStats*>(stats.specific.get());
+
+ if (verbosity >= ExplainCommon::EXEC_STATS) {
+ bob->appendNumber("docsExamined", spec->fetches);
+ }
} else if (STAGE_UPDATE == stats.stageType) {
UpdateStats* spec = static_cast<UpdateStats*>(stats.specific.get());
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index 5a4981dc81b..311fa2925d5 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -92,7 +92,12 @@ enum StageType {
STAGE_SORT,
STAGE_SORT_MERGE,
STAGE_SUBPLAN,
+
+ // Stages for running text search.
STAGE_TEXT,
+ STAGE_TEXT_OR,
+ STAGE_TEXT_MATCH,
+
STAGE_UNKNOWN,
STAGE_UPDATE,