diff options
author | David Storch <david.storch@10gen.com> | 2016-01-08 17:31:20 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2016-01-13 12:02:15 -0500 |
commit | c2edffac9d7da55422e56213076fabdfbd8e7bb1 (patch) | |
tree | 2dc03ceca6d7ec4911672d189fc15ede997692d6 /src | |
parent | db54c89ec37618c11f913f01c77820f6b699b9ed (diff) | |
download | mongo-c2edffac9d7da55422e56213076fabdfbd8e7bb1.tar.gz |
SERVER-15962 only use KEEP_MUTATIONS for index intersection plans
It is no longer required for other plans.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/exec/merge_sort.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/query/planner_analysis.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner_test.cpp | 63 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_merge_sort.cpp | 135 |
4 files changed, 179 insertions, 39 deletions
diff --git a/src/mongo/db/exec/merge_sort.cpp b/src/mongo/db/exec/merge_sort.cpp index 5c1bbb64e3a..10c85b68304 100644 --- a/src/mongo/db/exec/merge_sort.cpp +++ b/src/mongo/db/exec/merge_sort.cpp @@ -180,11 +180,6 @@ PlanStage::StageState MergeSortStage::work(WorkingSetID* out) { *out = idToTest; ++_commonStats.advanced; - // But don't return it if it's flagged. - if (_ws->isFlagged(*out)) { - return PlanStage::NEED_TIME; - } - return PlanStage::ADVANCED; } @@ -198,16 +193,15 @@ void MergeSortStage::doInvalidate(OperationContext* txn, valueIt++) { WorkingSetMember* member = _ws->get(valueIt->id); if (member->hasLoc() && (dl == member->loc)) { - // Force a fetch and flag. We could possibly merge this result back in later. + // Fetch the about-to-be mutated result. WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); - _ws->flagForReview(valueIt->id); ++_specificStats.forcedFetches; } } - // If we see DL again it is not the same record as it once was so we still want to - // return it. - if (_dedup) { + // If we see the deleted RecordId again it is not the same record as it once was so we still + // want to return it. + if (_dedup && INVALIDATION_DELETION == type) { _seen.erase(dl); } } diff --git a/src/mongo/db/query/planner_analysis.cpp b/src/mongo/db/query/planner_analysis.cpp index e1a69d04530..7139b22e40a 100644 --- a/src/mongo/db/query/planner_analysis.cpp +++ b/src/mongo/db/query/planner_analysis.cpp @@ -677,10 +677,8 @@ QuerySolution* QueryPlannerAnalysis::analyzeDataAccess(const CanonicalQuery& que hasNode(solnRoot, STAGE_GEO_NEAR_2D) || hasNode(solnRoot, STAGE_GEO_NEAR_2DSPHERE) || (!lpq.getSort().isEmpty() && !hasSortStage) || hasNotRootSort; - // Only these stages can produce flagged results. A stage has to hold state past one call - // to work(...) in order to possibly flag a result. - const bool couldProduceFlagged = - hasAndHashStage || hasNode(solnRoot, STAGE_AND_SORTED) || hasNode(solnRoot, STAGE_FETCH); + // Only index intersection stages ever produce flagged results. + const bool couldProduceFlagged = hasAndHashStage || hasNode(solnRoot, STAGE_AND_SORTED); const bool shouldAddMutation = !cannotKeepFlagged && couldProduceFlagged; diff --git a/src/mongo/db/query/query_planner_test.cpp b/src/mongo/db/query/query_planner_test.cpp index 8b5e86a0b79..f8d74ccf155 100644 --- a/src/mongo/db/query/query_planner_test.cpp +++ b/src/mongo/db/query/query_planner_test.cpp @@ -3384,18 +3384,17 @@ TEST_F(QueryPlannerTest, NoMutationsForSort) { "{cscan: {dir: 1}}}}}}"); } -// An index scan + fetch requires a keep node as it can flag data. Also make sure we put it in -// the right place, under the sort. -TEST_F(QueryPlannerTest, MutationsFromFetch) { +// A basic index scan, fetch, and sort plan cannot produce flagged data. +TEST_F(QueryPlannerTest, MutationsFromFetchWithSort) { params.options = QueryPlannerParams::KEEP_MUTATIONS; addIndex(BSON("a" << 1)); runQuerySortProj(fromjson("{a: 5}"), fromjson("{b:1}"), BSONObj()); assertSolutionExists( - "{sort: {pattern: {b:1}, limit: 0, node: {keep: {node: {sortKeyGen: {node: " - "{fetch: {node: {ixscan: {pattern: {a:1}}}}}}}}}}}"); + "{sort: {pattern: {b:1}, limit: 0, node: {sortKeyGen: {node: " + "{fetch: {node: {ixscan: {pattern: {a:1}}}}}}}}}"); } -// Index scan w/covering doesn't require a keep node as there's no fetch. +// Index scan w/covering doesn't require a keep node. TEST_F(QueryPlannerTest, NoFetchNoKeep) { params.options = QueryPlannerParams::KEEP_MUTATIONS; addIndex(BSON("x" << 1)); @@ -3448,6 +3447,58 @@ TEST_F(QueryPlannerTest, NoKeepWithNToReturn) { "{fetch: {node: {ixscan: {pattern: {a: 1}}}}}}}}}]}}}}"); } +// Mergesort plans do not require a keep mutations stage. +TEST_F(QueryPlannerTest, NoKeepWithMergeSort) { + params.options = QueryPlannerParams::KEEP_MUTATIONS; + + addIndex(BSON("a" << 1 << "b" << 1)); + runQuerySortProj(fromjson("{a: {$in: [1, 2]}}"), BSON("b" << 1), BSONObj()); + + assertNumSolutions(1U); + assertSolutionExists( + "{fetch: {filter: null, node: {mergeSort: {nodes: [" + "{ixscan: {pattern: {a: 1, b: 1}," + "bounds: {a: [[1,1,true,true]], b: [['MinKey','MaxKey',true,true]]}}}," + "{ixscan: {pattern: {a: 1, b: 1}," + "bounds: {a: [[2,2,true,true]], b: [['MinKey','MaxKey',true,true]]}}}]}}}}"); +} + +// Hash-based index intersection plans require a keep mutations stage. +TEST_F(QueryPlannerTest, AndHashRequiresKeepMutations) { + params.options = QueryPlannerParams::KEEP_MUTATIONS; + params.options |= QueryPlannerParams::INDEX_INTERSECTION; + + addIndex(BSON("a" << 1)); + addIndex(BSON("b" << 1)); + runQuery(fromjson("{a: {$gte: 0}, b: {$gte: 0}}")); + + assertNumSolutions(3U); + assertSolutionExists("{fetch: {filter: {a: {$gte: 0}}, node: {ixscan: {pattern: {b: 1}}}}}"); + assertSolutionExists("{fetch: {filter: {b: {$gte: 0}}, node: {ixscan: {pattern: {a: 1}}}}}"); + assertSolutionExists( + "{fetch: {filter: null, node: {keep: {node: {andHash: {nodes: [" + "{ixscan: {pattern: {a: 1}}}," + "{ixscan: {pattern: {b: 1}}}]}}}}}}"); +} + +// Sort-based index intersection plans require a keep mutations stage. +TEST_F(QueryPlannerTest, AndSortedRequiresKeepMutations) { + params.options = QueryPlannerParams::KEEP_MUTATIONS; + params.options |= QueryPlannerParams::INDEX_INTERSECTION; + + addIndex(BSON("a" << 1)); + addIndex(BSON("b" << 1)); + runQuery(fromjson("{a: 2, b: 3}")); + + assertNumSolutions(3U); + assertSolutionExists("{fetch: {filter: {a: 2}, node: {ixscan: {pattern: {b: 1}}}}}"); + assertSolutionExists("{fetch: {filter: {b: 3}, node: {ixscan: {pattern: {a: 1}}}}}"); + assertSolutionExists( + "{fetch: {filter: null, node: {keep: {node: {andSorted: {nodes: [" + "{ixscan: {pattern: {a: 1}}}," + "{ixscan: {pattern: {b: 1}}}]}}}}}}"); +} + // Make sure a top-level $or hits the limiting number // of solutions that we are willing to consider. TEST_F(QueryPlannerTest, OrEnumerationLimit) { diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index b553ecf1269..c6fd3073ead 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -588,31 +588,28 @@ public: ++it; } - // Invalidate locs[11]. Should force a fetch. We don't get it back. + // Invalidate locs[11]. Should force a fetch and return the deleted document. ms->saveState(); ms->invalidate(&_txn, *it, INVALIDATION_DELETION); ms->restoreState(); // Make sure locs[11] was fetched for us. { - // TODO: If we have "return upon invalidation" ever triggerable, do the following test. - /* - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState status; - do { - status = ms->work(&id); - } while (PlanStage::ADVANCED != status); - - WorkingSetMember* member = ws.get(id); - ASSERT(!member->hasLoc()); - ASSERT(member->hasObj()); - string index(1, 'a' + count); - BSONElement elt; - ASSERT_TRUE(member->getFieldDotted(index, &elt)); - ASSERT_EQUALS(1, elt.numberInt()); - ASSERT(member->getFieldDotted("foo", &elt)); - ASSERT_EQUALS(count, elt.numberInt()); - */ + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState status; + do { + status = ms->work(&id); + } while (PlanStage::ADVANCED != status); + + WorkingSetMember* member = ws.get(id); + ASSERT(!member->hasLoc()); + ASSERT(member->hasObj()); + string index(1, 'a' + count); + BSONElement elt; + ASSERT_TRUE(member->getFieldDotted(index, &elt)); + ASSERT_EQUALS(1, elt.numberInt()); + ASSERT(member->getFieldDotted("foo", &elt)); + ASSERT_EQUALS(count, elt.numberInt()); ++it; ++count; @@ -640,6 +637,105 @@ public: } }; +// Test that if a WSM buffered inside the merge sort stage gets updated, we return the document and +// then correctly dedup if we see the same RecordId again. +class QueryStageMergeSortInvalidationMutationDedup : public QueryStageMergeSortTestBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); + } + + // Insert data. + insert(BSON("_id" << 4 << "a" << 4)); + insert(BSON("_id" << 5 << "a" << 5)); + insert(BSON("_id" << 6 << "a" << 6)); + + addIndex(BSON("a" << 1)); + + std::set<RecordId> rids; + getLocs(&rids, coll); + set<RecordId>::iterator it = rids.begin(); + + WorkingSet ws; + WorkingSetMember* member; + MergeSortStageParams msparams; + msparams.pattern = BSON("a" << 1); + auto ms = stdx::make_unique<MergeSortStage>(&_txn, msparams, &ws, coll); + + // First child scans [5, 10]. + { + IndexScanParams params; + params.descriptor = getIndex(BSON("a" << 1), coll); + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 5); + params.bounds.endKey = BSON("" << 10); + params.bounds.endKeyInclusive = true; + params.direction = 1; + auto fetchStage = stdx::make_unique<FetchStage>( + &_txn, &ws, new IndexScan(&_txn, params, &ws, nullptr), nullptr, coll); + ms->addChild(fetchStage.release()); + } + + // Second child scans [4, 10]. + { + IndexScanParams params; + params.descriptor = getIndex(BSON("a" << 1), coll); + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 4); + params.bounds.endKey = BSON("" << 10); + params.bounds.endKeyInclusive = true; + params.direction = 1; + auto fetchStage = stdx::make_unique<FetchStage>( + &_txn, &ws, new IndexScan(&_txn, params, &ws, nullptr), nullptr, coll); + ms->addChild(fetchStage.release()); + } + + // First doc should be {a: 4}. + member = getNextResult(&ws, ms.get()); + ASSERT_EQ(member->getState(), WorkingSetMember::LOC_AND_OBJ); + ASSERT_EQ(member->loc, *it); + ASSERT_EQ(member->obj.value(), BSON("_id" << 4 << "a" << 4)); + ++it; + + // Doc {a: 5} gets invalidated by an update. + ms->invalidate(&_txn, *it, INVALIDATION_MUTATION); + + // Invalidated doc {a: 5} should still get returned. + member = getNextResult(&ws, ms.get()); + ASSERT_EQ(member->getState(), WorkingSetMember::OWNED_OBJ); + ASSERT_EQ(member->obj.value(), BSON("_id" << 5 << "a" << 5)); + ++it; + + // We correctly dedup the invalidated doc and return {a: 6} next. + member = getNextResult(&ws, ms.get()); + ASSERT_EQ(member->getState(), WorkingSetMember::LOC_AND_OBJ); + ASSERT_EQ(member->loc, *it); + ASSERT_EQ(member->obj.value(), BSON("_id" << 6 << "a" << 6)); + } + +private: + WorkingSetMember* getNextResult(WorkingSet* ws, PlanStage* stage) { + while (!stage->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState status = stage->work(&id); + if (PlanStage::ADVANCED != status) { + continue; + } + + return ws->get(id); + } + + FAIL("Expected to produce another result but hit EOF"); + return nullptr; + } +}; + class All : public Suite { public: All() : Suite("query_stage_merge_sort_test") {} @@ -652,6 +748,7 @@ public: add<QueryStageMergeSortOneStageEOF>(); add<QueryStageMergeSortManyShort>(); add<QueryStageMergeSortInvalidation>(); + add<QueryStageMergeSortInvalidationMutationDedup>(); } }; |