diff options
23 files changed, 87 insertions, 940 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c91df3a29de..c242abd8d26 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -960,7 +960,6 @@ env.Library( 'exec/geo_near.cpp', 'exec/idhack.cpp', 'exec/index_scan.cpp', - 'exec/keep_mutations.cpp', 'exec/limit.cpp', 'exec/merge_sort.cpp', 'exec/multi_iterator.cpp', diff --git a/src/mongo/db/exec/and_hash.cpp b/src/mongo/db/exec/and_hash.cpp index 50ccd06f105..0ae93e9ad3a 100644 --- a/src/mongo/db/exec/and_hash.cpp +++ b/src/mongo/db/exec/and_hash.cpp @@ -207,12 +207,10 @@ PlanStage::StageState AndHashStage::doWork(WorkingSetID* out) { // We know that we've ADVANCED. See if the WSM is in our table. WorkingSetMember* member = _ws->get(*out); - // Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything - // with this WSM. - if (!member->hasRecordId()) { - _ws->flagForReview(*out); - return PlanStage::NEED_TIME; - } + // The child must give us a WorkingSetMember with a record id, since we intersect index keys + // based on the record id. The planner ensures that the child stage can never produce an WSM + // with no record id. + invariant(member->hasRecordId()); DataMap::iterator it = _dataMap.find(member->recordId); if (_dataMap.end() == it) { @@ -252,12 +250,10 @@ PlanStage::StageState AndHashStage::readFirstChild(WorkingSetID* out) { if (PlanStage::ADVANCED == childStatus) { WorkingSetMember* member = _ws->get(id); - // Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything - // with this WSM. - if (!member->hasRecordId()) { - _ws->flagForReview(id); - return PlanStage::NEED_TIME; - } + // The child must give us a WorkingSetMember with a record id, since we intersect index keys + // based on the record id. The planner ensures that the child stage can never produce an WSM + // with no record id. + invariant(member->hasRecordId()); if (!_dataMap.insert(std::make_pair(member->recordId, id)).second) { // Didn't insert because we already had this RecordId inside the map. This should only @@ -311,14 +307,11 @@ PlanStage::StageState AndHashStage::hashOtherChildren(WorkingSetID* out) { if (PlanStage::ADVANCED == childStatus) { WorkingSetMember* member = _ws->get(id); - // Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything - // with this WSM. - if (!member->hasRecordId()) { - _ws->flagForReview(id); - return PlanStage::NEED_TIME; - } + // The child must give us a WorkingSetMember with a record id, since we intersect index keys + // based on the record id. The planner ensures that the child stage can never produce an + // WSM with no record id. + invariant(member->hasRecordId()); - verify(member->hasRecordId()); if (_dataMap.end() == _dataMap.find(member->recordId)) { // Ignore. It's not in any previous child. } else { @@ -390,6 +383,8 @@ PlanStage::StageState AndHashStage::hashOtherChildren(WorkingSetID* out) { } } +// TODO SERVER-16857: Delete this method, as the invalidation mechanism was only needed for the +// MMAPv1 storage engine. void AndHashStage::doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { @@ -405,7 +400,6 @@ void AndHashStage::doInvalidate(OperationContext* opCtx, WorkingSetMember* member = _ws->get(_lookAheadResults[i]); if (member->hasRecordId() && member->recordId == dl) { WorkingSetCommon::fetchAndInvalidateRecordId(opCtx, member, _collection); - _ws->flagForReview(_lookAheadResults[i]); _lookAheadResults[i] = WorkingSet::INVALID_ID; } } @@ -435,9 +429,6 @@ void AndHashStage::doInvalidate(OperationContext* opCtx, // The RecordId is about to be invalidated. Fetch it and clear the RecordId. WorkingSetCommon::fetchAndInvalidateRecordId(opCtx, member, _collection); - // Add the WSID to the to-be-reviewed list in the WS. - _ws->flagForReview(id); - // And don't return it from this stage. _dataMap.erase(it); } diff --git a/src/mongo/db/exec/and_sorted.cpp b/src/mongo/db/exec/and_sorted.cpp index 50a0bee1715..b5e3cfa24fd 100644 --- a/src/mongo/db/exec/and_sorted.cpp +++ b/src/mongo/db/exec/and_sorted.cpp @@ -96,14 +96,10 @@ PlanStage::StageState AndSortedStage::getTargetRecordId(WorkingSetID* out) { if (PlanStage::ADVANCED == state) { WorkingSetMember* member = _ws->get(id); - // Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything - // with this WSM. - if (!member->hasRecordId()) { - _ws->flagForReview(id); - return PlanStage::NEED_TIME; - } - - verify(member->hasRecordId()); + // The child must give us a WorkingSetMember with a record id, since we intersect index keys + // based on the record id. The planner ensures that the child stage can never produce an WSM + // with no record id. + invariant(member->hasRecordId()); // We have a value from one child to AND with. _targetNode = 0; @@ -158,14 +154,10 @@ PlanStage::StageState AndSortedStage::moveTowardTargetRecordId(WorkingSetID* out if (PlanStage::ADVANCED == state) { WorkingSetMember* member = _ws->get(id); - // Maybe the child had an invalidation. We intersect RecordId(s) so we can't do anything - // with this WSM. - if (!member->hasRecordId()) { - _ws->flagForReview(id); - return PlanStage::NEED_TIME; - } - - verify(member->hasRecordId()); + // The child must give us a WorkingSetMember with a record id, since we intersect index keys + // based on the record id. The planner ensures that the child stage can never produce an WSM + // with no record id. + invariant(member->hasRecordId()); if (member->recordId == _targetRecordId) { // The front element has hit _targetRecordId. Don't move it forward anymore/work on @@ -236,6 +228,8 @@ PlanStage::StageState AndSortedStage::moveTowardTargetRecordId(WorkingSetID* out } +// TODO SERVER-16857: Delete this method, as the invalidation mechanism was only needed for the +// MMAPv1 storage engine. void AndSortedStage::doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) { @@ -254,7 +248,6 @@ void AndSortedStage::doInvalidate(OperationContext* opCtx, // The RecordId could still be a valid result so flag it and save it for later. WorkingSetCommon::fetchAndInvalidateRecordId(opCtx, _ws->get(_targetId), _collection); - _ws->flagForReview(_targetId); _targetId = WorkingSet::INVALID_ID; _targetNode = numeric_limits<size_t>::max(); diff --git a/src/mongo/db/exec/keep_mutations.cpp b/src/mongo/db/exec/keep_mutations.cpp deleted file mode 100644 index 24c7f492e96..00000000000 --- a/src/mongo/db/exec/keep_mutations.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/exec/keep_mutations.h" - -#include "mongo/db/exec/filter.h" -#include "mongo/db/exec/scoped_timer.h" -#include "mongo/stdx/memory.h" - -namespace mongo { - -using std::unique_ptr; -using std::vector; -using stdx::make_unique; - -// static -const char* KeepMutationsStage::kStageType = "KEEP_MUTATIONS"; - -KeepMutationsStage::KeepMutationsStage(OperationContext* opCtx, - const MatchExpression* filter, - WorkingSet* ws, - PlanStage* child) - : PlanStage(kStageType, opCtx), - _workingSet(ws), - _filter(filter), - _doneReadingChild(false), - _doneReturningFlagged(false) { - _children.emplace_back(child); -} - -KeepMutationsStage::~KeepMutationsStage() {} - -bool KeepMutationsStage::isEOF() { - return _doneReadingChild && _doneReturningFlagged; -} - -PlanStage::StageState KeepMutationsStage::doWork(WorkingSetID* out) { - // If we've returned as many results as we're limited to, isEOF will be true. - if (isEOF()) { - return PlanStage::IS_EOF; - } - - // Stream child results until the child is all done. - if (!_doneReadingChild) { - StageState status = child()->work(out); - - // Child is still returning results. Pass them through. - if (PlanStage::IS_EOF != status) { - return status; - } - - // Child is EOF. We want to stream flagged results if there are any. - _doneReadingChild = true; - - // Read out all of the flagged results from the working set. We can't iterate through - // the working set's flagged result set directly, since it may be modified later if - // further documents are invalidated during a yield. - std::copy(_workingSet->getFlagged().begin(), - _workingSet->getFlagged().end(), - std::back_inserter(_flagged)); - _flaggedIterator = _flagged.begin(); - } - - // We're streaming flagged results. - invariant(!_doneReturningFlagged); - if (_flaggedIterator == _flagged.end()) { - _doneReturningFlagged = true; - return PlanStage::IS_EOF; - } - - WorkingSetID idToTest = *_flaggedIterator; - _flaggedIterator++; - - WorkingSetMember* member = _workingSet->get(idToTest); - if (Filter::passes(member, _filter)) { - *out = idToTest; - return PlanStage::ADVANCED; - } else { - _workingSet->free(idToTest); - return PlanStage::NEED_TIME; - } -} - -unique_ptr<PlanStageStats> KeepMutationsStage::getStats() { - _commonStats.isEOF = isEOF(); - unique_ptr<PlanStageStats> ret = - make_unique<PlanStageStats>(_commonStats, STAGE_KEEP_MUTATIONS); - ret->children.emplace_back(child()->getStats()); - return ret; -} - -const SpecificStats* KeepMutationsStage::getSpecificStats() const { - return NULL; -} - -} // namespace mongo diff --git a/src/mongo/db/exec/keep_mutations.h b/src/mongo/db/exec/keep_mutations.h deleted file mode 100644 index cadfc02940c..00000000000 --- a/src/mongo/db/exec/keep_mutations.h +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - - -#include "mongo/db/exec/plan_stage.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/matcher/expression.h" -#include "mongo/db/record_id.h" - -namespace mongo { - -/** - * KeepMutationsStage passes all of its child's data through until the child is EOF. - * It then returns all flagged elements in the WorkingSet that pass the stage's filter. - * - * This stage is used to merge results that are invalidated mid-query back into the query - * results when possible. The query planner is responsible for determining when it's valid to - * merge these results. - */ -class KeepMutationsStage final : public PlanStage { -public: - KeepMutationsStage(OperationContext* opCtx, - const MatchExpression* filter, - WorkingSet* ws, - PlanStage* child); - ~KeepMutationsStage(); - - bool isEOF() final; - StageState doWork(WorkingSetID* out) final; - - StageType stageType() const final { - return STAGE_KEEP_MUTATIONS; - } - - std::unique_ptr<PlanStageStats> getStats() final; - - const SpecificStats* getSpecificStats() const final; - - static const char* kStageType; - -private: - // Not owned here. - WorkingSet* _workingSet; - - // Not owned here. Should be the full query expression tree. - const MatchExpression* _filter; - - // We read from our child... - bool _doneReadingChild; - - // ...until it's out of results, at which point we put any flagged results back in the query - // stream. - bool _doneReturningFlagged; - - // Our copy of the working set's flagged results. - std::vector<WorkingSetID> _flagged; - - // Iterator pointing into _flagged. - std::vector<WorkingSetID>::const_iterator _flaggedIterator; -}; - -} // namespace mongo diff --git a/src/mongo/db/exec/working_set.cpp b/src/mongo/db/exec/working_set.cpp index 1dc83678df0..a66a184e440 100644 --- a/src/mongo/db/exec/working_set.cpp +++ b/src/mongo/db/exec/working_set.cpp @@ -80,21 +80,6 @@ void WorkingSet::free(WorkingSetID i) { _freeList = i; } -void WorkingSet::flagForReview(WorkingSetID i) { - WorkingSetMember* member = get(i); - verify(WorkingSetMember::OWNED_OBJ == member->_state); - _flagged.insert(i); -} - -const stdx::unordered_set<WorkingSetID>& WorkingSet::getFlagged() const { - return _flagged; -} - -bool WorkingSet::isFlagged(WorkingSetID id) const { - invariant(id < _data.size()); - return _flagged.end() != _flagged.find(id); -} - void WorkingSet::clear() { for (size_t i = 0; i < _data.size(); i++) { delete _data[i].member; @@ -105,7 +90,6 @@ void WorkingSet::clear() { // point to nothing. _freeList = INVALID_ID; - _flagged.clear(); _yieldSensitiveIds.clear(); } diff --git a/src/mongo/db/exec/working_set.h b/src/mongo/db/exec/working_set.h index 1de7a062e6c..a2a719208bb 100644 --- a/src/mongo/db/exec/working_set.h +++ b/src/mongo/db/exec/working_set.h @@ -48,11 +48,6 @@ typedef size_t WorkingSetID; * All data in use by a query. Data is passed through the stage tree by referencing the ID of * an element of the working set. Stages can add elements to the working set, delete elements * from the working set, or mutate elements in the working set. - * - * Concurrency Notes: - * flagForReview() can only be called with a write lock covering the collection this WorkingSet - * is for. All other methods should only be called by the thread owning this WorkingSet while - * holding the read lock covering the collection. */ class WorkingSet { MONGO_DISALLOW_COPYING(WorkingSet); @@ -92,26 +87,6 @@ public: void free(WorkingSetID i); /** - * The RecordId in WSM 'i' was invalidated while being processed. Any predicates over the - * WSM could not be fully evaluated, so the WSM may or may not satisfy them. As such, if we - * wish to output the WSM, we must do some clean-up work later. Adds the WSM with id 'i' to - * the list of flagged WSIDs. - * - * The WSM must be in the state OWNED_OBJ. - */ - void flagForReview(WorkingSetID i); - - /** - * Return true if the provided ID is flagged. - */ - bool isFlagged(WorkingSetID id) const; - - /** - * Return the set of all WSIDs passed to flagForReview. - */ - const stdx::unordered_set<WorkingSetID>& getFlagged() const; - - /** * Removes and deallocates all members of this working set. */ void clear(); @@ -125,16 +100,16 @@ public: void transitionToOwnedObj(WorkingSetID id); /** - * Returns the list of working set ids that have transitioned into the RID_AND_IDX or - * RID_AND_OBJ state since the last yield. The members corresponding to these ids may have since - * transitioned to a different state or been freed, so these cases must be handled by the - * caller. The list may also contain duplicates. + * Returns the list of working set ids that have transitioned into the RID_AND_IDX state since + * the last yield. The members corresponding to these ids may have since transitioned to a + * different state or been freed, so these cases must be handled by the caller. The list may + * also contain duplicates. * * Execution stages are *not* responsible for managing this list, as working set ids are added - * to the set automatically by WorkingSet::transitionToRecordIdAndIdx() and - * WorkingSet::transitionToRecordIdAndObj(). + * to the set automatically by WorkingSet::transitionToRecordIdAndIdx(). * - * As a side effect, calling this method clears the list of flagged ids kept by the working set. + * As a side effect, calling this method clears the list of flagged yield sensitive ids kept by + * the working set. */ std::vector<WorkingSetID> getAndClearYieldSensitiveIds(); @@ -159,9 +134,6 @@ private: // If _freeList == INVALID_ID, the free list is empty and all elements in _data are in use. WorkingSetID _freeList; - // An insert-only set of WorkingSetIDs that have been flagged for review. - stdx::unordered_set<WorkingSetID> _flagged; - // Contains ids of WSMs that may need to be adjusted when we next yield. std::vector<WorkingSetID> _yieldSensitiveIds; }; diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 2b604ac59bf..75b41f5071f 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -195,15 +195,10 @@ void fillOutPlannerParams(OperationContext* opCtx, plannerParams->options |= QueryPlannerParams::SPLIT_LIMITED_SORT; - // Doc-level locking storage engines cannot answer predicates implicitly via exact index - // bounds for index intersection plans, as this can lead to spurious matches. - // - // Such storage engines do not use the invalidation framework, and therefore - // have no need for KEEP_MUTATIONS. + // Doc-level locking storage engines cannot answer predicates implicitly via exact index bounds + // for index intersection plans, as this can lead to spurious matches. if (supportsDocLocking()) { plannerParams->options |= QueryPlannerParams::CANNOT_TRIM_IXISECT; - } else { - plannerParams->options |= QueryPlannerParams::KEEP_MUTATIONS; } if (shouldWaitForOplogVisibility( diff --git a/src/mongo/db/query/planner_analysis.cpp b/src/mongo/db/query/planner_analysis.cpp index 6852210a506..846dca9062b 100644 --- a/src/mongo/db/query/planner_analysis.cpp +++ b/src/mongo/db/query/planner_analysis.cpp @@ -673,53 +673,6 @@ std::unique_ptr<QuerySolution> QueryPlannerAnalysis::analyzeDataAccess( const QueryRequest& qr = query.getQueryRequest(); - // If we can (and should), add the keep mutations stage. - - // We cannot keep mutated documents if: - // - // 1. The query requires an index to evaluate the predicate ($text). We can't tell whether - // or not the doc actually satisfies the $text predicate since we can't evaluate a - // text MatchExpression. - // - // 2. The query implies a sort ($geoNear). It would be rather expensive and hacky to merge - // the document at the right place. - // - // 3. There is an index-provided sort. Ditto above comment about merging. - // - // 4. There is a SORT that is not at the root of solution tree. Ditto above comment about - // merging. - // - // TODO: do we want some kind of pre-planning step where we look for certain nodes and cache - // them? We do lookups in the tree a few times. This may not matter as most trees are - // shallow in terms of query nodes. - const bool hasNotRootSort = hasSortStage && STAGE_SORT != solnRoot->getType(); - - const bool cannotKeepFlagged = hasNode(solnRoot.get(), STAGE_TEXT) || - hasNode(solnRoot.get(), STAGE_GEO_NEAR_2D) || - hasNode(solnRoot.get(), STAGE_GEO_NEAR_2DSPHERE) || - (!qr.getSort().isEmpty() && !hasSortStage) || hasNotRootSort; - - // Only index intersection stages ever produce flagged results. - const bool couldProduceFlagged = hasAndHashStage || hasNode(solnRoot.get(), STAGE_AND_SORTED); - - const bool shouldAddMutation = !cannotKeepFlagged && couldProduceFlagged; - - if (shouldAddMutation && (params.options & QueryPlannerParams::KEEP_MUTATIONS)) { - KeepMutationsNode* keep = new KeepMutationsNode(); - - // We must run the entire expression tree to make sure the document is still valid. - keep->filter = query.root()->shallowClone(); - - if (STAGE_SORT == solnRoot->getType()) { - // We want to insert the invalidated results before the sort stage, if there is one. - verify(1 == solnRoot->children.size()); - keep->children.push_back(solnRoot->children[0]); - solnRoot->children[0] = keep; - } else { - keep->children.push_back(solnRoot.release()); - solnRoot.reset(keep); - } - } if (qr.getSkip()) { auto skip = std::make_unique<SkipNode>(); diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index e1bf324c0d9..b4eaa5188f5 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -111,9 +111,6 @@ string optionString(size_t options) { case QueryPlannerParams::INDEX_INTERSECTION: ss << "INDEX_INTERSECTION "; break; - case QueryPlannerParams::KEEP_MUTATIONS: - ss << "KEEP_MUTATIONS "; - break; case QueryPlannerParams::IS_COUNT: ss << "IS_COUNT "; break; diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index 85da68100bd..6d31af0b9e0 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -70,35 +70,31 @@ struct QueryPlannerParams { // Set this if you want to turn on index intersection. INDEX_INTERSECTION = 1 << 4, - // Set this if you want to try to keep documents deleted or mutated during the execution - // of the query in the query results. - KEEP_MUTATIONS = 1 << 5, - // Indicate to the planner that the caller is requesting a count operation, possibly through // a count command, or as part of an aggregation pipeline. - IS_COUNT = 1 << 6, + IS_COUNT = 1 << 5, // Set this if you want to handle batchSize properly with sort(). If limits on SORT // stages are always actually limits, then this should be left off. If they are // sometimes to be interpreted as batchSize, then this should be turned on. - SPLIT_LIMITED_SORT = 1 << 7, + SPLIT_LIMITED_SORT = 1 << 6, // Set this to prevent the planner from generating plans which answer a predicate // implicitly via exact index bounds for index intersection solutions. - CANNOT_TRIM_IXISECT = 1 << 8, + CANNOT_TRIM_IXISECT = 1 << 7, // Set this if you don't want any plans with a non-covered projection stage. All projections // must be provided/covered by an index. - NO_UNCOVERED_PROJECTIONS = 1 << 10, + NO_UNCOVERED_PROJECTIONS = 1 << 8, // Set this to generate covered whole IXSCAN plans. - GENERATE_COVERED_IXSCANS = 1 << 11, + GENERATE_COVERED_IXSCANS = 1 << 9, // Set this to track the most recent timestamp seen by this cursor while scanning the oplog. - TRACK_LATEST_OPLOG_TS = 1 << 12, + TRACK_LATEST_OPLOG_TS = 1 << 10, // Set this so that collection scans on the oplog wait for visibility before reading. - OPLOG_SCAN_WAIT_FOR_VISIBLE = 1 << 13, + OPLOG_SCAN_WAIT_FOR_VISIBLE = 1 << 11, }; // See Options enum above. diff --git a/src/mongo/db/query/query_planner_test.cpp b/src/mongo/db/query/query_planner_test.cpp index bcf7d2d2368..a754d00f155 100644 --- a/src/mongo/db/query/query_planner_test.cpp +++ b/src/mongo/db/query/query_planner_test.cpp @@ -3674,141 +3674,6 @@ TEST_F(QueryPlannerTest, IntersectCompoundInsteadUnusedField2) { "{ixscan: {filter: null, pattern: {a:1,b:1,c:1}}}}}"); } -// -// Test that we add a KeepMutations when we should and and we don't add one when we shouldn't. -// - -// Collection scan doesn't keep any state, so it can't produce flagged data. -TEST_F(QueryPlannerTest, NoMutationsForCollscan) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - runQuery(fromjson("")); - assertSolutionExists("{cscan: {dir: 1}}"); -} - -// Collscan + sort doesn't produce flagged data either. -TEST_F(QueryPlannerTest, NoMutationsForSort) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - runQuerySortProj(fromjson(""), fromjson("{a:1}"), BSONObj()); - assertSolutionExists( - "{sort: {pattern: {a: 1}, limit: 0, node: {sortKeyGen: {node: " - "{cscan: {dir: 1}}}}}}"); -} - -// A basic index scan, fetch, and sort plan cannot produce flagged data. -TEST_F(QueryPlannerTest, MutationsFromFetchWithSort) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - addIndex(BSON("a" << 1)); - runQuerySortProj(fromjson("{a: 5}"), fromjson("{b:1}"), BSONObj()); - assertSolutionExists( - "{sort: {pattern: {b:1}, limit: 0, node: {sortKeyGen: {node: " - "{fetch: {node: {ixscan: {pattern: {a:1}}}}}}}}}"); -} - -// Index scan w/covering doesn't require a keep node. -TEST_F(QueryPlannerTest, NoFetchNoKeep) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - addIndex(BSON("x" << 1)); - // query, sort, proj - runQuerySortProj(fromjson("{ x : {$gt: 1}}"), BSONObj(), fromjson("{_id: 0, x: 1}")); - - // cscan is a soln but we override the params that say to include it. - ASSERT_EQUALS(getNumSolutions(), 1U); - assertSolutionExists( - "{proj: {spec: {_id: 0, x: 1}, node: {ixscan: " - "{filter: null, pattern: {x: 1}}}}}"); -} - -// No keep with geoNear. -TEST_F(QueryPlannerTest, NoKeepWithGeoNear) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - addIndex(BSON("a" - << "2d")); - runQuery(fromjson("{a: {$near: [0,0], $maxDistance:0.3 }}")); - ASSERT_EQUALS(getNumSolutions(), 1U); - assertSolutionExists("{geoNear2d: {a: '2d'}}"); -} - -// No keep when we have an indexed sort. -TEST_F(QueryPlannerTest, NoKeepWithIndexedSort) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - addIndex(BSON("a" << 1 << "b" << 1)); - runQuerySortProjSkipNToReturn(fromjson("{a: {$in: [1, 2]}}"), BSON("b" << 1), BSONObj(), 0, 1); - - // cscan solution exists but we didn't turn on the "always include a collscan." - assertNumSolutions(1); - assertSolutionExists( - "{fetch: {node: {mergeSort: {nodes: " - "[{ixscan: {pattern: {a: 1, b: 1}}}, {ixscan: {pattern: {a: 1, b: 1}}}]}}}}"); -} - -// No KeepMutations when we have a sort that is not root, like the ntoreturn hack. -TEST_F(QueryPlannerTest, NoKeepWithNToReturn) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - params.options |= QueryPlannerParams::SPLIT_LIMITED_SORT; - addIndex(BSON("a" << 1)); - runQuerySortProjSkipNToReturn(fromjson("{a: 1}"), fromjson("{b: 1}"), BSONObj(), 0, 3); - - assertSolutionExists( - "{ensureSorted: {pattern: {b: 1}, node: " - "{or: {nodes: [" - "{sort: {pattern: {b: 1}, limit: 3, node: {sortKeyGen: {node: " - "{fetch: {node: {ixscan: {pattern: {a: 1}}}}}}}}}, " - "{sort: {pattern: {b: 1}, limit: 0, node: {sortKeyGen: {node: " - "{fetch: {node: {ixscan: {pattern: {a: 1}}}}}}}}}]}}}}"); -} - -// Mergesort plans do not require a keep mutations stage. -TEST_F(QueryPlannerTest, NoKeepWithMergeSort) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - - addIndex(BSON("a" << 1 << "b" << 1)); - runQuerySortProj(fromjson("{a: {$in: [1, 2]}}"), BSON("b" << 1), BSONObj()); - - assertNumSolutions(1U); - assertSolutionExists( - "{fetch: {filter: null, node: {mergeSort: {nodes: [" - "{ixscan: {pattern: {a: 1, b: 1}," - "bounds: {a: [[1,1,true,true]], b: [['MinKey','MaxKey',true,true]]}}}," - "{ixscan: {pattern: {a: 1, b: 1}," - "bounds: {a: [[2,2,true,true]], b: [['MinKey','MaxKey',true,true]]}}}]}}}}"); -} - -// Hash-based index intersection plans require a keep mutations stage. -TEST_F(QueryPlannerTest, AndHashRequiresKeepMutations) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - params.options |= QueryPlannerParams::INDEX_INTERSECTION; - - addIndex(BSON("a" << 1)); - addIndex(BSON("b" << 1)); - runQuery(fromjson("{a: {$gte: 0}, b: {$gte: 0}}")); - - assertNumSolutions(3U); - assertSolutionExists("{fetch: {filter: {a: {$gte: 0}}, node: {ixscan: {pattern: {b: 1}}}}}"); - assertSolutionExists("{fetch: {filter: {b: {$gte: 0}}, node: {ixscan: {pattern: {a: 1}}}}}"); - assertSolutionExists( - "{fetch: {filter: null, node: {keep: {node: {andHash: {nodes: [" - "{ixscan: {pattern: {a: 1}}}," - "{ixscan: {pattern: {b: 1}}}]}}}}}}"); -} - -// Sort-based index intersection plans require a keep mutations stage. -TEST_F(QueryPlannerTest, AndSortedRequiresKeepMutations) { - params.options = QueryPlannerParams::KEEP_MUTATIONS; - params.options |= QueryPlannerParams::INDEX_INTERSECTION; - - addIndex(BSON("a" << 1)); - addIndex(BSON("b" << 1)); - runQuery(fromjson("{a: 2, b: 3}")); - - assertNumSolutions(3U); - assertSolutionExists("{fetch: {filter: {a: 2}, node: {ixscan: {pattern: {b: 1}}}}}"); - assertSolutionExists("{fetch: {filter: {b: 3}, node: {ixscan: {pattern: {a: 1}}}}}"); - assertSolutionExists( - "{fetch: {filter: null, node: {keep: {node: {andSorted: {nodes: [" - "{ixscan: {pattern: {a: 1}}}," - "{ixscan: {pattern: {b: 1}}}]}}}}}}"); -} - // Make sure a top-level $or hits the limiting number // of solutions that we are willing to consider. TEST_F(QueryPlannerTest, OrEnumerationLimit) { diff --git a/src/mongo/db/query/query_planner_test_lib.cpp b/src/mongo/db/query/query_planner_test_lib.cpp index be3e8871b9c..1a7029a6428 100644 --- a/src/mongo/db/query/query_planner_test_lib.cpp +++ b/src/mongo/db/query/query_planner_test_lib.cpp @@ -658,22 +658,6 @@ bool QueryPlannerTestLib::solutionMatches(const BSONObj& testSoln, } return (limitEl.numberInt() == ln->limit) && solutionMatches(child.Obj(), ln->children[0]); - } else if (STAGE_KEEP_MUTATIONS == trueSoln->getType()) { - const KeepMutationsNode* kn = static_cast<const KeepMutationsNode*>(trueSoln); - - BSONElement el = testSoln["keep"]; - if (el.eoo() || !el.isABSONObj()) { - return false; - } - BSONObj keepObj = el.Obj(); - - // Doesn't have any parameters really. - BSONElement child = keepObj["node"]; - if (child.eoo() || !child.isABSONObj()) { - return false; - } - - return solutionMatches(child.Obj(), kn->children[0]); } else if (STAGE_SHARDING_FILTER == trueSoln->getType()) { const ShardingFilterNode* fn = static_cast<const ShardingFilterNode*>(trueSoln); diff --git a/src/mongo/db/query/query_solution.cpp b/src/mongo/db/query/query_solution.cpp index 4fa0b62bace..39b5045c2ea 100644 --- a/src/mongo/db/query/query_solution.cpp +++ b/src/mongo/db/query/query_solution.cpp @@ -1066,35 +1066,6 @@ QuerySolutionNode* ShardingFilterNode::clone() const { } // -// KeepMutationsNode -// - -void KeepMutationsNode::appendToString(mongoutils::str::stream* ss, int indent) const { - addIndent(ss, indent); - *ss << "KEEP_MUTATIONS\n"; - if (NULL != filter) { - addIndent(ss, indent + 1); - StringBuilder sb; - *ss << "filter:\n"; - filter->debugString(sb, indent + 2); - *ss << sb.str(); - } - addCommon(ss, indent); - addIndent(ss, indent + 1); - *ss << "Child:" << '\n'; - children[0]->appendToString(ss, indent + 2); -} - -QuerySolutionNode* KeepMutationsNode::clone() const { - KeepMutationsNode* copy = new KeepMutationsNode(); - cloneBaseData(copy); - - copy->sorts = this->sorts; - - return copy; -} - -// // DistinctNode // diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 0877be76724..6373ae01494 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -848,44 +848,6 @@ struct ShardingFilterNode : public QuerySolutionNode { }; /** - * If documents mutate or are deleted during a query, we can (in some cases) fetch them - * and still return them. This stage merges documents that have been mutated or deleted - * into the query result stream. - */ -struct KeepMutationsNode : public QuerySolutionNode { - KeepMutationsNode() : sorts(SimpleBSONObjComparator::kInstance.makeBSONObjSet()) {} - - virtual ~KeepMutationsNode() {} - - virtual StageType getType() const { - return STAGE_KEEP_MUTATIONS; - } - virtual void appendToString(mongoutils::str::stream* ss, int indent) const; - - // Any flagged results are OWNED_OBJ and therefore we're covered if our child is. - bool fetched() const { - return children[0]->fetched(); - } - - // Any flagged results are OWNED_OBJ and as such they'll have any field we need. - bool hasField(const std::string& field) const { - return children[0]->hasField(field); - } - - bool sortedByDiskLoc() const { - return false; - } - const BSONObjSet& getSort() const { - return sorts; - } - - QuerySolutionNode* clone() const; - - // Since we merge in flagged results we have no sort order. - BSONObjSet sorts; -}; - -/** * Distinct queries only want one value for a given field. We run an index scan but * *always* skip over the current key to the next key. */ diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 16b4be88766..1f00fab5d4f 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -45,7 +45,6 @@ #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/geo_near.h" #include "mongo/db/exec/index_scan.h" -#include "mongo/db/exec/keep_mutations.h" #include "mongo/db/exec/limit.h" #include "mongo/db/exec/merge_sort.h" #include "mongo/db/exec/or.h" @@ -299,14 +298,6 @@ PlanStage* buildStages(OperationContext* opCtx, ws, childStage); } - case STAGE_KEEP_MUTATIONS: { - const KeepMutationsNode* km = static_cast<const KeepMutationsNode*>(root); - PlanStage* childStage = buildStages(opCtx, collection, cq, qsol, km->children[0], ws); - if (nullptr == childStage) { - return nullptr; - } - return new KeepMutationsStage(opCtx, km->filter.get(), ws, childStage); - } case STAGE_DISTINCT_SCAN: { const DistinctNode* dn = static_cast<const DistinctNode*>(root); diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index 610aadbdeaf..398d077b76e 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -58,10 +58,6 @@ enum StageType { STAGE_EOF, - // This is more of an "internal-only" stage where we try to keep docs that were mutated - // during query execution. - STAGE_KEEP_MUTATIONS, - STAGE_FETCH, // The two $geoNear impls imply a fetch+sort and must be stages. diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 05822f6a69c..9a3b1d505ee 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -91,7 +91,6 @@ dbtest = env.Program( 'query_stage_ensure_sorted.cpp', 'query_stage_fetch.cpp', 'query_stage_ixscan.cpp', - 'query_stage_keep.cpp', 'query_stage_limit_skip.cpp', 'query_stage_merge_sort.cpp', 'query_stage_near.cpp', diff --git a/src/mongo/dbtests/plan_ranking.cpp b/src/mongo/dbtests/plan_ranking.cpp index 01d7b6af527..467add0901b 100644 --- a/src/mongo/dbtests/plan_ranking.cpp +++ b/src/mongo/dbtests/plan_ranking.cpp @@ -111,8 +111,6 @@ public: QueryPlannerParams plannerParams; fillOutPlannerParams(&_opCtx, collection, cq, &plannerParams); - // Turn this off otherwise it pops up in some plans. - plannerParams.options &= ~QueryPlannerParams::KEEP_MUTATIONS; // Plan. auto statusWithSolutions = QueryPlanner::plan(*cq, plannerParams); diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp index d18346cb21b..31aaf656a06 100644 --- a/src/mongo/dbtests/query_stage_and.cpp +++ b/src/mongo/dbtests/query_stage_and.cpp @@ -163,10 +163,10 @@ private: // /** - * Invalidate a RecordId held by a hashed AND before the AND finishes evaluating. The AND should - * process all other data just fine and flag the invalidated RecordId in the WorkingSet. + * Delete a RecordId held by a hashed AND before the AND finishes evaluating. The AND should + * return the result despite its deletion. */ -class QueryStageAndHashInvalidation : public QueryStageAndBase { +class QueryStageAndHashDeleteDuringYield : public QueryStageAndBase { public: void run() { dbtests::WriteContextForTests ctx(&_opCtx, ns()); @@ -188,7 +188,7 @@ public: WorkingSet ws; auto ah = make_unique<AndHashStage>(&_opCtx, &ws, coll); - // Foo <= 20 + // Foo <= 20. IndexScanParams params; params.descriptor = getIndex(BSON("foo" << 1), coll); params.bounds.isSimpleRange = true; @@ -198,7 +198,7 @@ public: params.direction = -1; ah->addChild(new IndexScan(&_opCtx, params, &ws, NULL)); - // Bar >= 10 + // Bar >= 10. params.descriptor = getIndex(BSON("bar" << 1), coll); params.bounds.startKey = BSON("" << 10); params.bounds.endKey = BSONObj(); @@ -206,24 +206,21 @@ public: params.direction = 1; ah->addChild(new IndexScan(&_opCtx, params, &ws, NULL)); - // ah reads the first child into its hash table. - // ah should read foo=20, foo=19, ..., foo=0 in that order. - // Read half of them... + // 'ah' reads the first child into its hash table: foo=20, foo=19, ..., foo=0 + // in that order. Read half of them. for (int i = 0; i < 10; ++i) { WorkingSetID out; PlanStage::StageState status = ah->work(&out); ASSERT_EQUALS(PlanStage::NEED_TIME, status); } - // ...yield + // Save state and delete one of the read objects. ah->saveState(); - // ...invalidate one of the read objects set<RecordId> data; getRecordIds(&data, coll); size_t memUsageBefore = ah->getMemUsage(); for (set<RecordId>::const_iterator it = data.begin(); it != data.end(); ++it) { if (coll->docFor(&_opCtx, *it).value()["foo"].numberInt() == 15) { - ah->invalidate(&_opCtx, *it, INVALIDATION_DELETION); remove(coll->docFor(&_opCtx, *it).value()); break; } @@ -231,23 +228,13 @@ public: size_t memUsageAfter = ah->getMemUsage(); ah->restoreState(); - // Invalidating a read object should decrease memory usage. - ASSERT_LESS_THAN(memUsageAfter, memUsageBefore); - - // And expect to find foo==15 it flagged for review. - const stdx::unordered_set<WorkingSetID>& flagged = ws.getFlagged(); - ASSERT_EQUALS(size_t(1), flagged.size()); + // The deleted result should still be buffered inside the AND_HASH stage, so there should be + // no change in memory consumption. + ASSERT_EQ(memUsageAfter, memUsageBefore); - // Expect to find the right value of foo in the flagged item. - WorkingSetMember* member = ws.get(*flagged.begin()); - ASSERT_TRUE(NULL != member); - ASSERT_EQUALS(WorkingSetMember::OWNED_OBJ, member->getState()); - BSONElement elt; - ASSERT_TRUE(member->getFieldDotted("foo", &elt)); - ASSERT_EQUALS(15, elt.numberInt()); - - // Now, finish up the AND. Since foo == bar, we would have 11 results, but we subtract - // one because of a mid-plan invalidation, so 10. + // Now, finish up the AND. We expect 10 results. Although the deleted result is still + // buffered, the {bar: 1} index scan won't encounter the deleted document, and hence the + // document won't appear in the result set. int count = 0; while (!ah->isEOF()) { WorkingSetID id = WorkingSet::INVALID_ID; @@ -257,7 +244,8 @@ public: } ++count; - member = ws.get(id); + BSONElement elt; + WorkingSetMember* member = ws.get(id); ASSERT_TRUE(member->getFieldDotted("foo", &elt)); ASSERT_LESS_THAN_OR_EQUALS(elt.numberInt(), 20); @@ -270,8 +258,8 @@ public: } }; -// Invalidate one of the "are we EOF?" lookahead results. -class QueryStageAndHashInvalidateLookahead : public QueryStageAndBase { +// Delete one of the "are we EOF?" lookahead results while the plan is yielded. +class QueryStageAndHashDeleteLookaheadDuringYield : public QueryStageAndBase { public: void run() { dbtests::WriteContextForTests ctx(&_opCtx, ns()); @@ -294,7 +282,7 @@ public: WorkingSet ws; auto ah = make_unique<AndHashStage>(&_opCtx, &ws, coll); - // Foo <= 20 (descending) + // Foo <= 20 (descending). IndexScanParams params; params.descriptor = getIndex(BSON("foo" << 1), coll); params.bounds.isSimpleRange = true; @@ -304,45 +292,39 @@ public: params.direction = -1; ah->addChild(new IndexScan(&_opCtx, params, &ws, NULL)); - // Bar <= 19 (descending) + // Bar <= 19 (descending). params.descriptor = getIndex(BSON("bar" << 1), coll); params.bounds.startKey = BSON("" << 19); ah->addChild(new IndexScan(&_opCtx, params, &ws, NULL)); - // First call to work reads the first result from the children. - // The first result is for the first scan over foo is {foo: 20, bar: 20, baz: 20}. - // The first result is for the second scan over bar is {foo: 19, bar: 19, baz: 19}. + // First call to work reads the first result from the children. The first result for the + // first scan over foo is {foo: 20, bar: 20, baz: 20}. The first result for the second scan + // over bar is {foo: 19, bar: 19, baz: 19}. WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState status = ah->work(&id); ASSERT_EQUALS(PlanStage::NEED_TIME, status); - const stdx::unordered_set<WorkingSetID>& flagged = ws.getFlagged(); - ASSERT_EQUALS(size_t(0), flagged.size()); - - // "delete" deletedObj (by invalidating the RecordId of the obj that matches it). + // Delete 'deletedObj' from the collection. BSONObj deletedObj = BSON("_id" << 20 << "foo" << 20 << "bar" << 20 << "baz" << 20); ah->saveState(); set<RecordId> data; getRecordIds(&data, coll); size_t memUsageBefore = ah->getMemUsage(); - for (set<RecordId>::const_iterator it = data.begin(); it != data.end(); ++it) { - if (0 == deletedObj.woCompare(coll->docFor(&_opCtx, *it).value())) { - ah->invalidate(&_opCtx, *it, INVALIDATION_DELETION); + for (auto&& recordId : data) { + if (0 == deletedObj.woCompare(coll->docFor(&_opCtx, recordId).value())) { + remove(coll->docFor(&_opCtx, recordId).value()); break; } } + // The deletion should not affect the amount of data buffered inside the AND_HASH stage. size_t memUsageAfter = ah->getMemUsage(); - // Look ahead results do not count towards memory usage. ASSERT_EQUALS(memUsageBefore, memUsageAfter); ah->restoreState(); - // The deleted obj should show up in flagged. - ASSERT_EQUALS(size_t(1), flagged.size()); - - // And not in our results. + // We expect that the deleted document doers not appear in our result set. int count = 0; while (!ah->isEOF()) { WorkingSetID id = WorkingSet::INVALID_ID; @@ -1003,10 +985,9 @@ public: // /** - * Invalidate a RecordId held by a sorted AND before the AND finishes evaluating. The AND should - * process all other data just fine and flag the invalidated RecordId in the WorkingSet. + * Delete a RecordId held by a sorted AND before the AND finishes evaluating. */ -class QueryStageAndSortedInvalidation : public QueryStageAndBase { +class QueryStageAndSortedDeleteDuringYield : public QueryStageAndBase { public: void run() { dbtests::WriteContextForTests ctx(&_opCtx, ns()); @@ -1018,7 +999,7 @@ public: wuow.commit(); } - // Insert a bunch of data + // Insert a bunch of data. for (int i = 0; i < 50; ++i) { insert(BSON("foo" << 1 << "bar" << 1)); } @@ -1028,7 +1009,7 @@ public: WorkingSet ws; auto ah = make_unique<AndSortedStage>(&_opCtx, &ws, coll); - // Scan over foo == 1 + // Scan over foo == 1. IndexScanParams params; params.descriptor = getIndex(BSON("foo" << 1), coll); params.bounds.isSimpleRange = true; @@ -1038,7 +1019,7 @@ public: params.direction = 1; ah->addChild(new IndexScan(&_opCtx, params, &ws, NULL)); - // Scan over bar == 1 + // Scan over bar == 1. params.descriptor = getIndex(BSON("bar" << 1), coll); ah->addChild(new IndexScan(&_opCtx, params, &ws, NULL)); @@ -1047,32 +1028,20 @@ public: getRecordIds(&data, coll); // We're making an assumption here that happens to be true because we clear out the - // collection before running this: increasing inserts have increasing RecordIds. - // This isn't true in general if the collection is not dropped beforehand. + // collection before running this: increasing inserts have increasing RecordIds. This isn't + // true in general if the collection is not dropped beforehand. WorkingSetID id = WorkingSet::INVALID_ID; // Sorted AND looks at the first child, which is an index scan over foo==1. ah->work(&id); // The first thing that the index scan returns (due to increasing RecordId trick) is the - // very first insert, which should be the very first thing in data. Let's invalidate it - // and make sure it shows up in the flagged results. + // very first insert, which should be the very first thing in data. Delete it. ah->saveState(); - ah->invalidate(&_opCtx, *data.begin(), INVALIDATION_DELETION); remove(coll->docFor(&_opCtx, *data.begin()).value()); ah->restoreState(); - // Make sure the nuked obj is actually in the flagged data. - ASSERT_EQUALS(ws.getFlagged().size(), size_t(1)); - WorkingSetMember* member = ws.get(*ws.getFlagged().begin()); - ASSERT_EQUALS(WorkingSetMember::OWNED_OBJ, member->getState()); - BSONElement elt; - ASSERT_TRUE(member->getFieldDotted("foo", &elt)); - ASSERT_EQUALS(1, elt.numberInt()); - ASSERT_TRUE(member->getFieldDotted("bar", &elt)); - ASSERT_EQUALS(1, elt.numberInt()); - - set<RecordId>::iterator it = data.begin(); + auto it = data.begin(); // Proceed along, AND-ing results. int count = 0; @@ -1085,8 +1054,9 @@ public: ++count; ++it; - member = ws.get(id); + WorkingSetMember* member = ws.get(id); + BSONElement elt; ASSERT_TRUE(member->getFieldDotted("foo", &elt)); ASSERT_EQUALS(1, elt.numberInt()); ASSERT_TRUE(member->getFieldDotted("bar", &elt)); @@ -1098,14 +1068,12 @@ public: for (int i = 0; i < count + 10; ++i) { ++it; } - // Remove a result that's coming up. It's not the 'target' result of the AND so it's - // not flagged. + // Remove a result that's coming up. ah->saveState(); - ah->invalidate(&_opCtx, *it, INVALIDATION_DELETION); remove(coll->docFor(&_opCtx, *it).value()); ah->restoreState(); - // Get all results aside from the two we killed. + // Get all results aside from the two we deleted. while (!ah->isEOF()) { WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState status = ah->work(&id); @@ -1114,8 +1082,9 @@ public: } ++count; - member = ws.get(id); + WorkingSetMember* member = ws.get(id); + BSONElement elt; ASSERT_TRUE(member->getFieldDotted("foo", &elt)); ASSERT_EQUALS(1, elt.numberInt()); ASSERT_TRUE(member->getFieldDotted("bar", &elt)); @@ -1123,8 +1092,6 @@ public: } ASSERT_EQUALS(count, 48); - - ASSERT_EQUALS(size_t(1), ws.getFlagged().size()); } }; @@ -1453,7 +1420,7 @@ public: All() : Suite("query_stage_and") {} void setupTests() { - add<QueryStageAndHashInvalidation>(); + add<QueryStageAndHashDeleteDuringYield>(); add<QueryStageAndHashTwoLeaf>(); add<QueryStageAndHashTwoLeafFirstChildLargeKeys>(); add<QueryStageAndHashTwoLeafLastChildLargeKeys>(); @@ -1461,11 +1428,11 @@ public: add<QueryStageAndHashThreeLeafMiddleChildLargeKeys>(); add<QueryStageAndHashWithNothing>(); add<QueryStageAndHashProducesNothing>(); - add<QueryStageAndHashInvalidateLookahead>(); + add<QueryStageAndHashDeleteLookaheadDuringYield>(); add<QueryStageAndHashFirstChildFetched>(); add<QueryStageAndHashSecondChildFetched>(); add<QueryStageAndHashDeadChild>(); - add<QueryStageAndSortedInvalidation>(); + add<QueryStageAndSortedDeleteDuringYield>(); add<QueryStageAndSortedThreeLeaf>(); add<QueryStageAndSortedWithNothing>(); add<QueryStageAndSortedProducesNothing>(); diff --git a/src/mongo/dbtests/query_stage_count_scan.cpp b/src/mongo/dbtests/query_stage_count_scan.cpp index dbddae9b3b3..0173625a85d 100644 --- a/src/mongo/dbtests/query_stage_count_scan.cpp +++ b/src/mongo/dbtests/query_stage_count_scan.cpp @@ -38,7 +38,6 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/count_scan.h" -#include "mongo/db/exec/keep_mutations.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" diff --git a/src/mongo/dbtests/query_stage_keep.cpp b/src/mongo/dbtests/query_stage_keep.cpp deleted file mode 100644 index 76ca9a0f78c..00000000000 --- a/src/mongo/dbtests/query_stage_keep.cpp +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -/** - * This file tests db/exec/keep_mutations.cpp. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/client/dbclient_cursor.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/client.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/exec/collection_scan.h" -#include "mongo/db/exec/eof.h" -#include "mongo/db/exec/keep_mutations.h" -#include "mongo/db/exec/plan_stage.h" -#include "mongo/db/exec/working_set.h" -#include "mongo/db/json.h" -#include "mongo/db/matcher/expression_parser.h" -#include "mongo/dbtests/dbtests.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/fail_point.h" -#include "mongo/util/fail_point_registry.h" -#include "mongo/util/fail_point_service.h" - -namespace QueryStageKeep { - -using std::set; -using std::shared_ptr; -using std::unique_ptr; -using stdx::make_unique; - -class QueryStageKeepBase { -public: - QueryStageKeepBase() : _client(&_opCtx) {} - - virtual ~QueryStageKeepBase() { - _client.dropCollection(ns()); - } - - void getLocs(set<RecordId>* out, Collection* coll) { - auto cursor = coll->getCursor(&_opCtx); - while (auto record = cursor->next()) { - out->insert(record->id); - } - } - - void insert(const BSONObj& obj) { - _client.insert(ns(), obj); - } - - void remove(const BSONObj& obj) { - _client.remove(ns(), obj); - } - - static const char* ns() { - return "unittests.QueryStageKeep"; - } - - WorkingSetID getNextResult(PlanStage* stage) { - while (!stage->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState status = stage->work(&id); - if (PlanStage::ADVANCED == status) { - return id; - } - } - return WorkingSet::INVALID_ID; - } - -protected: - const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext(); - OperationContext& _opCtx = *_txnPtr; - DBDirectClient _client; -}; - - -// Test that we actually merge flagged results. - -// -// Test that a fetch is passed up when it's not in memory. -// -class KeepStageBasic : public QueryStageKeepBase { -public: - void run() { - dbtests::WriteContextForTests ctx(&_opCtx, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(&_opCtx, ns()); - if (!coll) { - WriteUnitOfWork wuow(&_opCtx); - coll = db->createCollection(&_opCtx, ns()); - wuow.commit(); - } - - WorkingSet ws; - - // Add 10 objects to the collection. - for (size_t i = 0; i < 10; ++i) { - insert(BSON("x" << 1)); - } - - // Create 10 objects that are flagged. - for (size_t i = 0; i < 10; ++i) { - WorkingSetID id = ws.allocate(); - WorkingSetMember* member = ws.get(id); - member->obj = Snapshotted<BSONObj>(SnapshotId(), BSON("x" << 2)); - member->transitionToOwnedObj(); - ws.flagForReview(id); - } - - // Create a collscan to provide the 10 objects in the collection. - CollectionScanParams params; - params.collection = coll; - params.direction = CollectionScanParams::FORWARD; - params.tailable = false; - params.start = RecordId(); - CollectionScan* cs = new CollectionScan(&_opCtx, params, &ws, NULL); - - // Create a KeepMutations stage to merge in the 10 flagged objects. - // Takes ownership of 'cs' - MatchExpression* nullFilter = NULL; - auto keep = make_unique<KeepMutationsStage>(&_opCtx, nullFilter, &ws, cs); - - for (size_t i = 0; i < 10; ++i) { - WorkingSetID id = getNextResult(keep.get()); - WorkingSetMember* member = ws.get(id); - ASSERT_FALSE(ws.isFlagged(id)); - ASSERT_EQUALS(member->obj.value()["x"].numberInt(), 1); - } - - { - WorkingSetID out; - ASSERT_EQ(cs->work(&out), PlanStage::IS_EOF); - } - - // Flagged results *must* be at the end. - for (size_t i = 0; i < 10; ++i) { - WorkingSetID id = getNextResult(keep.get()); - WorkingSetMember* member = ws.get(id); - ASSERT(ws.isFlagged(id)); - ASSERT_EQUALS(member->obj.value()["x"].numberInt(), 2); - } - } -}; - -/** - * SERVER-15580: test that the KeepMutationsStage behaves correctly if additional results are - * flagged after some flagged results have already been returned. - */ -class KeepStageFlagAdditionalAfterStreamingStarts : public QueryStageKeepBase { -public: - void run() { - dbtests::WriteContextForTests ctx(&_opCtx, ns()); - - Database* db = ctx.db(); - Collection* coll = db->getCollection(&_opCtx, ns()); - if (!coll) { - WriteUnitOfWork wuow(&_opCtx); - coll = db->createCollection(&_opCtx, ns()); - wuow.commit(); - } - WorkingSet ws; - - std::set<WorkingSetID> expectedResultIds; - std::set<WorkingSetID> resultIds; - - // Create a KeepMutationsStage with an EOF child, and flag 50 objects. We expect these - // objects to be returned by the KeepMutationsStage. - MatchExpression* nullFilter = NULL; - auto keep = - make_unique<KeepMutationsStage>(&_opCtx, nullFilter, &ws, new EOFStage(&_opCtx)); - for (size_t i = 0; i < 50; ++i) { - WorkingSetID id = ws.allocate(); - WorkingSetMember* member = ws.get(id); - member->obj = Snapshotted<BSONObj>(SnapshotId(), BSON("x" << 1)); - member->transitionToOwnedObj(); - ws.flagForReview(id); - expectedResultIds.insert(id); - } - - // Call work() on the KeepMutationsStage. The stage should start streaming the - // already-flagged objects. - WorkingSetID id = getNextResult(keep.get()); - resultIds.insert(id); - - // Flag more objects, then call work() again on the KeepMutationsStage, and expect none - // of the newly-flagged objects to be returned (the KeepMutationsStage does not - // incorporate objects flagged since the streaming phase started). - // - // This condition triggers SERVER-15580 (the new flagging causes a rehash of the - // unordered_set "WorkingSet::_flagged", which invalidates all iterators, which were - // previously being dereferenced in KeepMutationsStage::work()). - // Note that stdx::unordered_set<>::insert() triggers a rehash if the new number of - // elements is greater than or equal to max_load_factor()*bucket_count(). - size_t rehashSize = - static_cast<size_t>(ws.getFlagged().max_load_factor() * ws.getFlagged().bucket_count()); - while (ws.getFlagged().size() <= rehashSize) { - WorkingSetID id = ws.allocate(); - WorkingSetMember* member = ws.get(id); - member->obj = Snapshotted<BSONObj>(SnapshotId(), BSON("x" << 1)); - member->transitionToOwnedObj(); - ws.flagForReview(id); - } - while ((id = getNextResult(keep.get())) != WorkingSet::INVALID_ID) { - resultIds.insert(id); - } - - // Assert that only the first 50 objects were returned. - ASSERT(expectedResultIds == resultIds); - } -}; - -class All : public Suite { -public: - All() : Suite("query_stage_keep") {} - - void setupTests() { - add<KeepStageBasic>(); - add<KeepStageFlagAdditionalAfterStreamingStarts>(); - } -}; - -SuiteInstance<All> queryStageKeepAll; - -} // namespace QueryStageKeep diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp index b0fa90d210c..dcce7668a73 100644 --- a/src/mongo/dbtests/query_stage_multiplan.cpp +++ b/src/mongo/dbtests/query_stage_multiplan.cpp @@ -388,8 +388,6 @@ TEST_F(QueryStageMultiPlanTest, MPSBackupPlan) { // Get planner params. QueryPlannerParams plannerParams; fillOutPlannerParams(_opCtx.get(), collection, cq.get(), &plannerParams); - // Turn this off otherwise it pops up in some plans. - plannerParams.options &= ~QueryPlannerParams::KEEP_MUTATIONS; // Plan. auto statusWithSolutions = QueryPlanner::plan(*cq, plannerParams); |