diff options
author | David Storch <david.storch@10gen.com> | 2015-01-05 11:09:07 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-01-07 09:07:20 -0500 |
commit | c11002f5f414b2b9f18b8abc69b4c69efc82f1fd (patch) | |
tree | ad827bbabb4316150146e2152f01b5776b61486a | |
parent | 68da73df17d5a26bcc1151013ee5298b2c7df909 (diff) | |
download | mongo-c11002f5f414b2b9f18b8abc69b4c69efc82f1fd.tar.gz |
SERVER-16675 force fetch RecordIds buffered by the query system on saveState()
This fixes an issue with WiredTiger query isolation.
-rw-r--r-- | jstests/core/getmore_invalidation.js | 80 | ||||
-rw-r--r-- | src/mongo/db/exec/and_common-inl.h | 15 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/delete.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/exec/merge_sort.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/multi_iterator.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/text.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/exec/text.h | 13 | ||||
-rw-r--r-- | src/mongo/db/exec/update.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/exec/working_set.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/exec/working_set.h | 59 | ||||
-rw-r--r-- | src/mongo/db/exec/working_set_common.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/exec/working_set_common.h | 13 | ||||
-rw-r--r-- | src/mongo/db/exec/working_set_test.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/query/query_solution.h | 9 |
16 files changed, 476 insertions, 45 deletions
diff --git a/jstests/core/getmore_invalidation.js b/jstests/core/getmore_invalidation.js new file mode 100644 index 00000000000..5fc67286ef2 --- /dev/null +++ b/jstests/core/getmore_invalidation.js @@ -0,0 +1,80 @@ +// Tests for invalidation during a getmore. This behavior is storage-engine dependent. +// See SERVER-16675. + +var t = db.jstests_getmore_invalidation; + +// Case #1: Text search with deletion invalidation. +t.drop(); +assert.commandWorked(t.ensureIndex({a: "text"})); +assert.writeOK(t.insert({_id: 1, a: "bar"})); +assert.writeOK(t.insert({_id: 2, a: "bar"})); +assert.writeOK(t.insert({_id: 3, a: "bar"})); + +var cursor = t.find({$text: {$search: "bar"}}).batchSize(2); +cursor.next(); +cursor.next(); + +assert.writeOK(t.remove({_id: 3})); + +// We should get back the document or not (depending on the storage engine / concurrency model). +// Either is fine as long as we don't crash. +var count = cursor.itcount(); +assert(count === 0 || count === 1); + +// Case #2: Text search with mutation invalidation. +t.drop(); +assert.commandWorked(t.ensureIndex({a: "text"})); +assert.writeOK(t.insert({_id: 1, a: "bar"})); +assert.writeOK(t.insert({_id: 2, a: "bar"})); +assert.writeOK(t.insert({_id: 3, a: "bar"})); + +var cursor = t.find({$text: {$search: "bar"}}).batchSize(2); +cursor.next(); +cursor.next(); + +// Update the next matching doc so that it no longer matches. +assert.writeOK(t.update({_id: 3}, {$set: {a: "nomatch"}})); + +// Either the cursor should skip the result that no longer matches, or we should get back the old +// version of the doc. +assert(!cursor.hasNext() || cursor.next()["a"] === "bar"); + +// Case #3: Merge sort with deletion invalidation. +t.drop(); +assert.commandWorked(t.ensureIndex({a: 1, b: 1})); +assert.writeOK(t.insert({a: 1, b: 1})); +assert.writeOK(t.insert({a: 1, b: 2})); +assert.writeOK(t.insert({a: 2, b: 3})); +assert.writeOK(t.insert({a: 2, b: 4})); + +var cursor = t.find({a: {$in: [1,2]}}).sort({b: 1}).batchSize(2); +cursor.next(); +cursor.next(); + +assert.writeOK(t.remove({a: 2, b: 3})); + +var count = cursor.itcount(); +assert(count === 1 || count === 2); + +// Case #4: Merge sort with mutation invalidation. +t.drop(); +assert.commandWorked(t.ensureIndex({a: 1, b: 1})); +assert.writeOK(t.insert({a: 1, b: 1})); +assert.writeOK(t.insert({a: 1, b: 2})); +assert.writeOK(t.insert({a: 2, b: 3})); +assert.writeOK(t.insert({a: 2, b: 4})); + +var cursor = t.find({a: {$in: [1,2]}}).sort({b: 1}).batchSize(2); +cursor.next(); +cursor.next(); + +assert.writeOK(t.update({a: 2, b: 3}, {$set: {a: 6}})); + +// Either the cursor should skip the result that no longer matches, or we should get back the old +// version of the doc. +assert(cursor.hasNext()); +assert(cursor.next()["a"] === 2); +if (cursor.hasNext()) { + assert(cursor.next()["a"] === 2); +} +assert(!cursor.hasNext()); diff --git a/src/mongo/db/exec/and_common-inl.h b/src/mongo/db/exec/and_common-inl.h index 7f133301a22..02956df537a 100644 --- a/src/mongo/db/exec/and_common-inl.h +++ b/src/mongo/db/exec/and_common-inl.h @@ -34,6 +34,9 @@ namespace mongo { * If src has any data dest doesn't, add that data to dest. */ static void mergeFrom(WorkingSetMember* dest, const WorkingSetMember& src) { + // Both 'src' and 'dest' must have a RecordId (and they must be the same RecordId), as + // we should have just matched them according to this RecordId while doing an + // intersection. verify(dest->hasLoc()); verify(src.hasLoc()); verify(dest->loc == src.loc); @@ -54,20 +57,14 @@ namespace mongo { if (src.hasObj()) { // 'src' has the full document but 'dest' doesn't so we need to copy it over. - // - // The source diskloc must be in the "diskloc and unowned object" state rather than - // the "owned object" state. This is because we've just intersected according to - // diskloc. Since we merge based on finding working set members with matching - // disklocs, we shouldn't have a WSM that is missing the diskloc. - invariant(WorkingSetMember::LOC_AND_UNOWNED_OBJ == src.state); - - // Copy the object to 'dest'. dest->obj = src.obj; // We have an object so we don't need key data. dest->keyData.clear(); - // 'dest' should be LOC_AND_UNOWNED_OBJ + // 'dest' should have the same state as 'src'. If 'src' has an unowned obj, then + // 'dest' also should have an unowned obj; if 'src' has an owned obj, then dest + // should also have an owned obj. dest->state = src.state; // Now 'dest' has the full object. No more work to do. diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 0bddd25ce6f..088fa9fab75 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -62,10 +62,10 @@ namespace mongo { _specificStats.direction = params.direction; // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used - // for anything other than passing up NEED_FETCH. We use the loc and unowned obj state, but + // for anything other than passing up NEED_FETCH. We use the loc and owned obj state, but // the loc isn't really pointing at any obj. The obj field of the WSM should never be used. WorkingSetMember* member = _workingSet->get(_wsidForFetch); - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; } PlanStage::StageState CollectionScan::work(WorkingSetID* out) { diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index 1765a698a60..81295361c01 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -90,6 +90,18 @@ namespace mongo { return PlanStage::FAILURE; } RecordId rloc = member->loc; + + // If the working set member is in the owned obj with loc state, then the document may + // have already been deleted after-being force-fetched. + if (WorkingSetMember::LOC_AND_OWNED_OBJ == member->state) { + BSONObj deletedDoc; + if (!_collection->findDoc(_txn, rloc, &deletedDoc)) { + // Doc is already deleted. Nothing more to do. + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + } + _ws->free(id); BSONObj deletedDoc; diff --git a/src/mongo/db/exec/merge_sort.cpp b/src/mongo/db/exec/merge_sort.cpp index d175d612787..f6e0e735a08 100644 --- a/src/mongo/db/exec/merge_sort.cpp +++ b/src/mongo/db/exec/merge_sort.cpp @@ -181,7 +181,6 @@ namespace mongo { // But don't return it if it's flagged. if (_ws->isFlagged(*out)) { - _ws->free(*out); return PlanStage::NEED_TIME; } diff --git a/src/mongo/db/exec/multi_iterator.cpp b/src/mongo/db/exec/multi_iterator.cpp index 9f5d0268c44..676f64f7e92 100644 --- a/src/mongo/db/exec/multi_iterator.cpp +++ b/src/mongo/db/exec/multi_iterator.cpp @@ -43,10 +43,10 @@ namespace mongo { _ws(ws), _wsidForFetch(_ws->allocate()) { // We pre-allocate a WSM and use it to pass up fetch requests. This should never be used - // for anything other than passing up NEED_FETCH. We use the loc and unowned obj state, but + // for anything other than passing up NEED_FETCH. We use the loc and owned obj state, but // the loc isn't really pointing at any obj. The obj field of the WSM should never be used. WorkingSetMember* member = _ws->get(_wsidForFetch); - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; } void MultiIteratorStage::addIterator(RecordIterator* it) { diff --git a/src/mongo/db/exec/text.cpp b/src/mongo/db/exec/text.cpp index 037b20c28aa..95053c8c8da 100644 --- a/src/mongo/db/exec/text.cpp +++ b/src/mongo/db/exec/text.cpp @@ -224,8 +224,7 @@ namespace mongo { invariant(1 == wsm->keyData.size()); invariant(wsm->hasLoc()); IndexKeyDatum& keyDatum = wsm->keyData.back(); - addTerm(keyDatum.keyData, wsm->loc); - _ws->free(id); + addTerm(keyDatum.keyData, id); return PlanStage::NEED_TIME; } else if (PlanStage::IS_EOF == childState) { @@ -270,31 +269,40 @@ namespace mongo { } // Filter for phrases and negative terms, score and truncate. - RecordId loc = _scoreIterator->first; - double score = _scoreIterator->second; + TextRecordData textRecordData = _scoreIterator->second; + WorkingSetMember* wsm = _ws->get(textRecordData.wsid); _scoreIterator++; // Ignore non-matched documents. - if (score < 0) { + if (textRecordData.score < 0) { + _ws->free(textRecordData.wsid); return PlanStage::NEED_TIME; } - // Fetch the document - BSONObj doc(_params.index->getCollection()->docFor(_txn, loc)); + // Retrieve the document. We may already have the document due to force-fetching before + // a yield. If not, then we fetch the document here. + BSONObj doc; + if (wsm->hasObj()) { + doc = wsm->obj; + } + else { + doc = _params.index->getCollection()->docFor(_txn, wsm->loc); + wsm->obj = doc; + wsm->keyData.clear(); + wsm->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + } // Filter for phrases and negated terms if (_params.query.hasNonTermPieces()) { if (!_ftsMatcher.matchesNonTerm(doc)) { + _ws->free(textRecordData.wsid); return PlanStage::NEED_TIME; } } - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->loc = loc; - member->obj = doc; - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - member->addComputed(new TextScoreComputedData(score)); + // Populate the working set member with the text score and return it. + wsm->addComputed(new TextScoreComputedData(textRecordData.score)); + *out = textRecordData.wsid; return PlanStage::ADVANCED; } @@ -356,8 +364,24 @@ namespace mongo { bool* _fetched; }; - void TextStage::addTerm(const BSONObj& key, const RecordId& loc) { - double *documentAggregateScore = &_scores[loc]; + void TextStage::addTerm(const BSONObj key, WorkingSetID wsid) { + WorkingSetMember* wsm = _ws->get(wsid); + TextRecordData* textRecordData = &_scores[wsm->loc]; + + if (WorkingSet::INVALID_ID == textRecordData->wsid) { + // We haven't seen this RecordId before. Keep the working set member around + // (it may be force-fetched on saveState()). + textRecordData->wsid = wsid; + } + else { + // We already have a working set member for this RecordId. Free the old + // WSM and retrieve the new one. + invariant(wsid != textRecordData->wsid); + _ws->free(wsid); + wsm = _ws->get(textRecordData->wsid); + } + + double* documentAggregateScore = &textRecordData->score; ++_specificStats.keysExamined; @@ -371,7 +395,7 @@ namespace mongo { BSONElement scoreElement = keyIt.next(); double documentTermScore = scoreElement.number(); - + // Handle filtering. if (*documentAggregateScore < 0) { // We have already rejected this document. @@ -383,10 +407,10 @@ namespace mongo { // We have not seen this document before and need to apply a filter. bool fetched = false; TextMatchableDocument tdoc(_txn, - _params.index->keyPattern(), - key, - loc, - _params.index->getCollection(), + _params.index->keyPattern(), + key, + wsm->loc, + _params.index->getCollection(), &fetched); if (!_filter->matches(&tdoc)) { diff --git a/src/mongo/db/exec/text.h b/src/mongo/db/exec/text.h index a4e851bbedb..764abf41002 100644 --- a/src/mongo/db/exec/text.h +++ b/src/mongo/db/exec/text.h @@ -141,7 +141,7 @@ namespace mongo { * score) pair for this document. Also rejects documents that don't match this stage's * filter. */ - void addTerm(const BSONObj& key, const RecordId& loc); + void addTerm(const BSONObj key, WorkingSetID wsid); /** * Possibly return a result. FYI, this may perform a fetch directly if it is needed to @@ -178,10 +178,17 @@ namespace mongo { // Which _scanners are we currently reading from? size_t _currentIndexScanner; + // Map each buffered record id to this data. + struct TextRecordData { + TextRecordData() : wsid(WorkingSet::INVALID_ID), score(0.0) { } + WorkingSetID wsid; + double score; + }; + // Temporary score data filled out by sub-scans. Used in READING_TERMS and // RETURNING_RESULTS. - // Maps from diskloc -> aggregate score for doc. - typedef unordered_map<RecordId, double, RecordId::Hasher> ScoreMap; + // Maps from diskloc -> (aggregate score for doc, wsid). + typedef unordered_map<RecordId, TextRecordData, RecordId::Hasher> ScoreMap; ScoreMap _scores; ScoreMap::const_iterator _scoreIterator; }; diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index a2821decefd..1f1b79a705d 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -762,8 +762,33 @@ namespace mongo { invariant(member->hasObj()); oldObj = member->obj; + // If the working set member is in the owned obj with loc state, then 'oldObj' may not + // be the latest version in the database. In this case, we must refetch the doc from the + // collection. We also must be tolerant of the possibility that the doc at the wsm's + // RecordId was deleted or updated after being force-fetched. + if (WorkingSetMember::LOC_AND_OWNED_OBJ == member->state) { + if (!_collection->findDoc(_txn, loc, &oldObj)) { + // The doc was deleted after the force-fetch, so we just move on. + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + + // We need to make sure that the doc still matches the predicate, as it may have + // been updated since being force-fetched. + // + // 'cq' may be NULL in the case of idhack updates. In this case, doc-level locking + // storage engines will look up the key in the _id index and fetch the keyed + // document in a single work() cyle. Since yielding cannot happen between these + // two events, the OperationContext protects from the doc changing under our feet. + CanonicalQuery* cq = _params.canonicalQuery; + if (cq && !cq->root()->matchesBSON(oldObj, NULL)) { + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + } + // If we're here, then we have retrieved both a RecordId and the corresponding - // unowned object from the child stage. Since we have the object and the diskloc, + // object from the child stage. Since we have the object and the diskloc, // we can free the WSM. _ws->free(id); diff --git a/src/mongo/db/exec/working_set.cpp b/src/mongo/db/exec/working_set.cpp index bf624ffa917..6fd3a37a0b6 100644 --- a/src/mongo/db/exec/working_set.cpp +++ b/src/mongo/db/exec/working_set.cpp @@ -102,6 +102,75 @@ namespace mongo { _flagged.clear(); } + // + // Iteration + // + + WorkingSet::iterator::iterator(WorkingSet* ws, size_t index) + : _ws(ws), + _index(index) { + // If we're currently not pointing at an allocated member, then we have + // to advance to the first one, unless we're already at the end. + if (_index < _ws->_data.size() && isFree()) { + advance(); + } + } + + void WorkingSet::iterator::advance() { + // Move forward at least once in the data list. + _index++; + + // While we haven't hit the end and the current member is not in use. (Skips ahead until + // we find the next allocated member.) + while (_index < _ws->_data.size() && isFree()) { + _index++; + } + } + + bool WorkingSet::iterator::isFree() const { + return _ws->_data[_index].nextFreeOrSelf != _index; + } + + void WorkingSet::iterator::free() { + dassert(!isFree()); + _ws->free(_index); + } + + void WorkingSet::iterator::operator++() { + dassert(_index < _ws->_data.size()); + advance(); + } + + bool WorkingSet::iterator::operator==(const WorkingSet::iterator& other) const { + return (_index == other._index); + } + + bool WorkingSet::iterator::operator!=(const WorkingSet::iterator& other) const { + return (_index != other._index); + } + + WorkingSetMember& WorkingSet::iterator::operator*() { + dassert(_index < _ws->_data.size() && !isFree()); + return *_ws->_data[_index].member; + } + + WorkingSetMember* WorkingSet::iterator::operator->() { + dassert(_index < _ws->_data.size() && !isFree()); + return _ws->_data[_index].member; + } + + WorkingSet::iterator WorkingSet::begin() { + return WorkingSet::iterator(this, 0); + } + + WorkingSet::iterator WorkingSet::end() { + return WorkingSet::iterator(this, _data.size()); + } + + // + // WorkingSetMember + // + WorkingSetMember::WorkingSetMember() : state(WorkingSetMember::INVALID) { } WorkingSetMember::~WorkingSetMember() { } @@ -117,7 +186,7 @@ namespace mongo { } bool WorkingSetMember::hasLoc() const { - return state == LOC_AND_IDX || state == LOC_AND_UNOWNED_OBJ; + return state == LOC_AND_IDX || state == LOC_AND_UNOWNED_OBJ || state == LOC_AND_OWNED_OBJ; } bool WorkingSetMember::hasObj() const { @@ -125,7 +194,7 @@ namespace mongo { } bool WorkingSetMember::hasOwnedObj() const { - return state == OWNED_OBJ; + return state == OWNED_OBJ || state == LOC_AND_OWNED_OBJ; } bool WorkingSetMember::hasUnownedObj() const { diff --git a/src/mongo/db/exec/working_set.h b/src/mongo/db/exec/working_set.h index 8704d16e44b..d8c8c6f3524 100644 --- a/src/mongo/db/exec/working_set.h +++ b/src/mongo/db/exec/working_set.h @@ -108,6 +108,58 @@ namespace mongo { */ void clear(); + // + // Iteration + // + + /** + * Forward iterates over the list of working set members, skipping any entries + * that are on the free list. + */ + class iterator { + public: + iterator(WorkingSet* ws, size_t index); + + void operator++(); + + bool operator==(const WorkingSet::iterator& other) const; + bool operator!=(const WorkingSet::iterator& other) const; + + WorkingSetMember& operator*(); + + WorkingSetMember* operator->(); + + /** + * Free the WSM we are currently pointing to. Does not advance the iterator. + * + * It is invalid to dereference the iterator after calling free until the iterator is + * next incremented. + */ + void free(); + + private: + /** + * Move the iterator forward to the next allocated WSM. + */ + void advance(); + + /** + * Returns true if the MemberHolder currently pointed at by the iterator is free, and + * false if it contains an allocated working set member. + */ + bool isFree() const; + + // The working set we're iterating over. Not owned here. + WorkingSet* _ws; + + // The index of the member we're currently pointing at. + size_t _index; + }; + + WorkingSet::iterator begin(); + + WorkingSet::iterator end(); + private: struct MemberHolder { MemberHolder(); @@ -220,6 +272,13 @@ namespace mongo { // RecordId has been invalidated, or the obj doesn't correspond to an on-disk document // anymore (e.g. is a computed expression). OWNED_OBJ, + + // Due to a yield, RecordId is no longer protected by the storage engine's transaction + // and may have been invalidated. The object is either identical to the object keyed + // by RecordId, or is an old version of the document stored at RecordId. + // + // Only used by doc-level locking storage engines (not used by MMAP v1). + LOC_AND_OWNED_OBJ, }; // diff --git a/src/mongo/db/exec/working_set_common.cpp b/src/mongo/db/exec/working_set_common.cpp index cd382e00298..5ad9a650baf 100644 --- a/src/mongo/db/exec/working_set_common.cpp +++ b/src/mongo/db/exec/working_set_common.cpp @@ -51,6 +51,42 @@ namespace mongo { } // static + void WorkingSetCommon::forceFetchAllLocs(OperationContext* txn, + WorkingSet* workingSet, + const Collection* collection) { + invariant(collection); + + for (WorkingSet::iterator it = workingSet->begin(); it != workingSet->end(); ++it) { + if (WorkingSetMember::LOC_AND_OWNED_OBJ == it->state) { + // Already in our desired state. + continue; + } + + // We can't do anything without a RecordId. + if (!it->hasLoc()) { + continue; + } + + // Do the fetch. It is possible in normal operation for the object keyed by this + // member's RecordId to no longer be present in the collection. Consider the case of a + // delete operation with three possible plans. During the course of plan selection, + // each candidate plan creates a working set member for document D. Then plan P wins, + // and starts to delete the matching documents, including D. The working set members for + // D created by the two rejected are still present, but their RecordIds no longer refer + // to a valid document. + BSONObj fetchedDoc; + if (!collection->findDoc(txn, it->loc, &fetchedDoc)) { + // Leftover working set members pointing to old docs can be safely freed. + it.free(); + continue; + } + + it->obj = fetchedDoc.getOwned(); + it->state = WorkingSetMember::LOC_AND_OWNED_OBJ; + } + } + + // static void WorkingSetCommon::completeFetch(OperationContext* txn, WorkingSetMember* member, const Collection* collection) { @@ -59,7 +95,7 @@ namespace mongo { // If the diskloc was invalidated during fetch, then a "forced fetch" already converted this // WSM into the owned object state. In this case, there is nothing more to do here. - if (WorkingSetMember::OWNED_OBJ == member->state) { + if (member->hasOwnedObj()) { return; } diff --git a/src/mongo/db/exec/working_set_common.h b/src/mongo/db/exec/working_set_common.h index 88f41a0653f..96ccb333c0a 100644 --- a/src/mongo/db/exec/working_set_common.h +++ b/src/mongo/db/exec/working_set_common.h @@ -44,6 +44,19 @@ namespace mongo { const Collection* collection); /** + * Iterates over 'workingSet'. For all valid working set members, if the member has a + * RecordId but does not have an owned obj, then puts the member in "loc with owned + * obj" state. + * + * This "force-fetching" is called on saveState() for storage-engines that support document- + * level locking. This ensures that all WS members are still valid, even after the + * OperationContext becomes invalid due to a yield. + */ + static void forceFetchAllLocs(OperationContext* txn, + WorkingSet* workingSet, + const Collection* collection); + + /** * After a NEED_FETCH is requested, this is used to actually retrieve the document * corresponding to 'member' from 'collection', and to set the state of 'member' * appropriately. diff --git a/src/mongo/db/exec/working_set_test.cpp b/src/mongo/db/exec/working_set_test.cpp index 25e1f8e8778..120e74a89ab 100644 --- a/src/mongo/db/exec/working_set_test.cpp +++ b/src/mongo/db/exec/working_set_test.cpp @@ -147,4 +147,100 @@ namespace { ASSERT_FALSE(member->getFieldDotted("y", &elt)); } + // + // WorkingSet::iterator tests + // + + TEST(WorkingSetIteratorTest, BasicIteratorTest) { + WorkingSet ws; + + WorkingSetID id1 = ws.allocate(); + WorkingSetMember* member1 = ws.get(id1); + member1->state = WorkingSetMember::LOC_AND_IDX; + member1->keyData.push_back(IndexKeyDatum(BSON("a" << 1), BSON("" << 3))); + + WorkingSetID id2 = ws.allocate(); + WorkingSetMember* member2 = ws.get(id2); + member2->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + member2->obj = BSON("a" << 3); + + int counter = 0; + for (WorkingSet::iterator it = ws.begin(); it != ws.end(); ++it) { + ASSERT(it->state == WorkingSetMember::LOC_AND_IDX || + it->state == WorkingSetMember::LOC_AND_UNOWNED_OBJ); + counter++; + } + ASSERT_EQ(counter, 2); + } + + TEST(WorkingSetIteratorTest, EmptyWorkingSet) { + WorkingSet ws; + + int counter = 0; + for (WorkingSet::iterator it = ws.begin(); it != ws.end(); ++it) { + counter++; + } + ASSERT_EQ(counter, 0); + } + + TEST(WorkingSetIteratorTest, EmptyWorkingSetDueToFree) { + WorkingSet ws; + + WorkingSetID id = ws.allocate(); + ws.free(id); + + int counter = 0; + for (WorkingSet::iterator it = ws.begin(); it != ws.end(); ++it) { + counter++; + } + ASSERT_EQ(counter, 0); + } + + TEST(WorkingSetIteratorTest, MixedFreeAndInUse) { + WorkingSet ws; + + WorkingSetID id1 = ws.allocate(); + WorkingSetID id2 = ws.allocate(); + WorkingSetID id3 = ws.allocate(); + + WorkingSetMember* member = ws.get(id2); + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + member->obj = BSON("a" << 3); + + ws.free(id1); + ws.free(id3); + + int counter = 0; + for (WorkingSet::iterator it = ws.begin(); it != ws.end(); ++it) { + ASSERT(it->state == WorkingSetMember::LOC_AND_UNOWNED_OBJ); + counter++; + } + ASSERT_EQ(counter, 1); + } + + TEST(WorkingSetIteratorTest, FreeWhileIterating) { + WorkingSet ws; + + ws.allocate(); + ws.allocate(); + ws.allocate(); + + // Free the last two members during iteration. + int counter = 0; + for (WorkingSet::iterator it = ws.begin(); it != ws.end(); ++it) { + if (counter > 0) { + it.free(); + } + counter++; + } + ASSERT_EQ(counter, 3); + + // Verify that only one item remains in the working set. + counter = 0; + for (WorkingSet::iterator it = ws.begin(); it != ws.end(); ++it) { + counter++; + } + ASSERT_EQ(counter, 1); + } + } // namespace diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index da113bbe72f..6d1ceffeb05 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -38,6 +38,7 @@ #include "mongo/db/exec/subplan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/storage/record_fetcher.h" @@ -239,6 +240,18 @@ namespace mongo { _root->saveState(); } + // Doc-locking storage engines drop their transactional context after saving state. + // The query stages inside this stage tree might buffer record ids (e.g. text, geoNear, + // mergeSort, sort) which are no longer protected by the storage engine's transactional + // boundaries. Force-fetch the documents for any such record ids so that we have our + // own copy in the working set. + // + // This is not necessary for covered plans, as such plans never use buffered record ids + // for index or collection lookup. + if (supportsDocLocking() && _collection && (!_qs.get() || _qs->root->fetched())) { + WorkingSetCommon::forceFetchAllLocs(_opCtx, _workingSet.get(), _collection); + } + _opCtx = NULL; } diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index e495fa6aed0..6a49beb6edc 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -225,7 +225,8 @@ namespace mongo { virtual void appendToString(mongoutils::str::stream* ss, int indent) const; - // text's return is LOC_AND_UNOWNED_OBJ so it's fetched and has all fields. + // Text's return is LOC_AND_UNOWNED_OBJ or LOC_AND_OWNED_OBJ so it's fetched and has all + // fields. bool fetched() const { return true; } bool hasField(const std::string& field) const { return true; } bool sortedByDiskLoc() const { return false; } @@ -447,9 +448,9 @@ namespace mongo { virtual void appendToString(mongoutils::str::stream* ss, int indent) const; /** - * This node changes the type to OWNED_OBJ. There's no fetching possible after this. + * Data from the projection node is considered fetch iff the child provides fetched data. */ - bool fetched() const { return true; } + bool fetched() const { return children[0]->fetched(); } bool hasField(const std::string& field) const { // TODO: Returning false isn't always the right answer -- we may either be including @@ -709,7 +710,7 @@ namespace mongo { virtual StageType getType() const { return STAGE_COUNT_SCAN; } virtual void appendToString(mongoutils::str::stream* ss, int indent) const; - bool fetched() const { return true; } + bool fetched() const { return false; } bool hasField(const std::string& field) const { return true; } bool sortedByDiskLoc() const { return false; } const BSONObjSet& getSort() const { return sorts; } |