summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2016-01-08 17:31:20 -0500
committerDavid Storch <david.storch@10gen.com>2016-01-13 12:02:15 -0500
commitc2edffac9d7da55422e56213076fabdfbd8e7bb1 (patch)
tree2dc03ceca6d7ec4911672d189fc15ede997692d6 /src
parentdb54c89ec37618c11f913f01c77820f6b699b9ed (diff)
downloadmongo-c2edffac9d7da55422e56213076fabdfbd8e7bb1.tar.gz
SERVER-15962 only use KEEP_MUTATIONS for index intersection plans
It is no longer required for other plans.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/exec/merge_sort.cpp14
-rw-r--r--src/mongo/db/query/planner_analysis.cpp6
-rw-r--r--src/mongo/db/query/query_planner_test.cpp63
-rw-r--r--src/mongo/dbtests/query_stage_merge_sort.cpp135
4 files changed, 179 insertions, 39 deletions
diff --git a/src/mongo/db/exec/merge_sort.cpp b/src/mongo/db/exec/merge_sort.cpp
index 5c1bbb64e3a..10c85b68304 100644
--- a/src/mongo/db/exec/merge_sort.cpp
+++ b/src/mongo/db/exec/merge_sort.cpp
@@ -180,11 +180,6 @@ PlanStage::StageState MergeSortStage::work(WorkingSetID* out) {
*out = idToTest;
++_commonStats.advanced;
- // But don't return it if it's flagged.
- if (_ws->isFlagged(*out)) {
- return PlanStage::NEED_TIME;
- }
-
return PlanStage::ADVANCED;
}
@@ -198,16 +193,15 @@ void MergeSortStage::doInvalidate(OperationContext* txn,
valueIt++) {
WorkingSetMember* member = _ws->get(valueIt->id);
if (member->hasLoc() && (dl == member->loc)) {
- // Force a fetch and flag. We could possibly merge this result back in later.
+ // Fetch the about-to-be mutated result.
WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection);
- _ws->flagForReview(valueIt->id);
++_specificStats.forcedFetches;
}
}
- // If we see DL again it is not the same record as it once was so we still want to
- // return it.
- if (_dedup) {
+ // If we see the deleted RecordId again it is not the same record as it once was so we still
+ // want to return it.
+ if (_dedup && INVALIDATION_DELETION == type) {
_seen.erase(dl);
}
}
diff --git a/src/mongo/db/query/planner_analysis.cpp b/src/mongo/db/query/planner_analysis.cpp
index e1a69d04530..7139b22e40a 100644
--- a/src/mongo/db/query/planner_analysis.cpp
+++ b/src/mongo/db/query/planner_analysis.cpp
@@ -677,10 +677,8 @@ QuerySolution* QueryPlannerAnalysis::analyzeDataAccess(const CanonicalQuery& que
hasNode(solnRoot, STAGE_GEO_NEAR_2D) || hasNode(solnRoot, STAGE_GEO_NEAR_2DSPHERE) ||
(!lpq.getSort().isEmpty() && !hasSortStage) || hasNotRootSort;
- // Only these stages can produce flagged results. A stage has to hold state past one call
- // to work(...) in order to possibly flag a result.
- const bool couldProduceFlagged =
- hasAndHashStage || hasNode(solnRoot, STAGE_AND_SORTED) || hasNode(solnRoot, STAGE_FETCH);
+ // Only index intersection stages ever produce flagged results.
+ const bool couldProduceFlagged = hasAndHashStage || hasNode(solnRoot, STAGE_AND_SORTED);
const bool shouldAddMutation = !cannotKeepFlagged && couldProduceFlagged;
diff --git a/src/mongo/db/query/query_planner_test.cpp b/src/mongo/db/query/query_planner_test.cpp
index 8b5e86a0b79..f8d74ccf155 100644
--- a/src/mongo/db/query/query_planner_test.cpp
+++ b/src/mongo/db/query/query_planner_test.cpp
@@ -3384,18 +3384,17 @@ TEST_F(QueryPlannerTest, NoMutationsForSort) {
"{cscan: {dir: 1}}}}}}");
}
-// An index scan + fetch requires a keep node as it can flag data. Also make sure we put it in
-// the right place, under the sort.
-TEST_F(QueryPlannerTest, MutationsFromFetch) {
+// A basic index scan, fetch, and sort plan cannot produce flagged data.
+TEST_F(QueryPlannerTest, MutationsFromFetchWithSort) {
params.options = QueryPlannerParams::KEEP_MUTATIONS;
addIndex(BSON("a" << 1));
runQuerySortProj(fromjson("{a: 5}"), fromjson("{b:1}"), BSONObj());
assertSolutionExists(
- "{sort: {pattern: {b:1}, limit: 0, node: {keep: {node: {sortKeyGen: {node: "
- "{fetch: {node: {ixscan: {pattern: {a:1}}}}}}}}}}}");
+ "{sort: {pattern: {b:1}, limit: 0, node: {sortKeyGen: {node: "
+ "{fetch: {node: {ixscan: {pattern: {a:1}}}}}}}}}");
}
-// Index scan w/covering doesn't require a keep node as there's no fetch.
+// Index scan w/covering doesn't require a keep node.
TEST_F(QueryPlannerTest, NoFetchNoKeep) {
params.options = QueryPlannerParams::KEEP_MUTATIONS;
addIndex(BSON("x" << 1));
@@ -3448,6 +3447,58 @@ TEST_F(QueryPlannerTest, NoKeepWithNToReturn) {
"{fetch: {node: {ixscan: {pattern: {a: 1}}}}}}}}}]}}}}");
}
+// Mergesort plans do not require a keep mutations stage.
+TEST_F(QueryPlannerTest, NoKeepWithMergeSort) {
+ params.options = QueryPlannerParams::KEEP_MUTATIONS;
+
+ addIndex(BSON("a" << 1 << "b" << 1));
+ runQuerySortProj(fromjson("{a: {$in: [1, 2]}}"), BSON("b" << 1), BSONObj());
+
+ assertNumSolutions(1U);
+ assertSolutionExists(
+ "{fetch: {filter: null, node: {mergeSort: {nodes: ["
+ "{ixscan: {pattern: {a: 1, b: 1},"
+ "bounds: {a: [[1,1,true,true]], b: [['MinKey','MaxKey',true,true]]}}},"
+ "{ixscan: {pattern: {a: 1, b: 1},"
+ "bounds: {a: [[2,2,true,true]], b: [['MinKey','MaxKey',true,true]]}}}]}}}}");
+}
+
+// Hash-based index intersection plans require a keep mutations stage.
+TEST_F(QueryPlannerTest, AndHashRequiresKeepMutations) {
+ params.options = QueryPlannerParams::KEEP_MUTATIONS;
+ params.options |= QueryPlannerParams::INDEX_INTERSECTION;
+
+ addIndex(BSON("a" << 1));
+ addIndex(BSON("b" << 1));
+ runQuery(fromjson("{a: {$gte: 0}, b: {$gte: 0}}"));
+
+ assertNumSolutions(3U);
+ assertSolutionExists("{fetch: {filter: {a: {$gte: 0}}, node: {ixscan: {pattern: {b: 1}}}}}");
+ assertSolutionExists("{fetch: {filter: {b: {$gte: 0}}, node: {ixscan: {pattern: {a: 1}}}}}");
+ assertSolutionExists(
+ "{fetch: {filter: null, node: {keep: {node: {andHash: {nodes: ["
+ "{ixscan: {pattern: {a: 1}}},"
+ "{ixscan: {pattern: {b: 1}}}]}}}}}}");
+}
+
+// Sort-based index intersection plans require a keep mutations stage.
+TEST_F(QueryPlannerTest, AndSortedRequiresKeepMutations) {
+ params.options = QueryPlannerParams::KEEP_MUTATIONS;
+ params.options |= QueryPlannerParams::INDEX_INTERSECTION;
+
+ addIndex(BSON("a" << 1));
+ addIndex(BSON("b" << 1));
+ runQuery(fromjson("{a: 2, b: 3}"));
+
+ assertNumSolutions(3U);
+ assertSolutionExists("{fetch: {filter: {a: 2}, node: {ixscan: {pattern: {b: 1}}}}}");
+ assertSolutionExists("{fetch: {filter: {b: 3}, node: {ixscan: {pattern: {a: 1}}}}}");
+ assertSolutionExists(
+ "{fetch: {filter: null, node: {keep: {node: {andSorted: {nodes: ["
+ "{ixscan: {pattern: {a: 1}}},"
+ "{ixscan: {pattern: {b: 1}}}]}}}}}}");
+}
+
// Make sure a top-level $or hits the limiting number
// of solutions that we are willing to consider.
TEST_F(QueryPlannerTest, OrEnumerationLimit) {
diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp
index b553ecf1269..c6fd3073ead 100644
--- a/src/mongo/dbtests/query_stage_merge_sort.cpp
+++ b/src/mongo/dbtests/query_stage_merge_sort.cpp
@@ -588,31 +588,28 @@ public:
++it;
}
- // Invalidate locs[11]. Should force a fetch. We don't get it back.
+ // Invalidate locs[11]. Should force a fetch and return the deleted document.
ms->saveState();
ms->invalidate(&_txn, *it, INVALIDATION_DELETION);
ms->restoreState();
// Make sure locs[11] was fetched for us.
{
- // TODO: If we have "return upon invalidation" ever triggerable, do the following test.
- /*
- WorkingSetID id = WorkingSet::INVALID_ID;
- PlanStage::StageState status;
- do {
- status = ms->work(&id);
- } while (PlanStage::ADVANCED != status);
-
- WorkingSetMember* member = ws.get(id);
- ASSERT(!member->hasLoc());
- ASSERT(member->hasObj());
- string index(1, 'a' + count);
- BSONElement elt;
- ASSERT_TRUE(member->getFieldDotted(index, &elt));
- ASSERT_EQUALS(1, elt.numberInt());
- ASSERT(member->getFieldDotted("foo", &elt));
- ASSERT_EQUALS(count, elt.numberInt());
- */
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState status;
+ do {
+ status = ms->work(&id);
+ } while (PlanStage::ADVANCED != status);
+
+ WorkingSetMember* member = ws.get(id);
+ ASSERT(!member->hasLoc());
+ ASSERT(member->hasObj());
+ string index(1, 'a' + count);
+ BSONElement elt;
+ ASSERT_TRUE(member->getFieldDotted(index, &elt));
+ ASSERT_EQUALS(1, elt.numberInt());
+ ASSERT(member->getFieldDotted("foo", &elt));
+ ASSERT_EQUALS(count, elt.numberInt());
++it;
++count;
@@ -640,6 +637,105 @@ public:
}
};
+// Test that if a WSM buffered inside the merge sort stage gets updated, we return the document and
+// then correctly dedup if we see the same RecordId again.
+class QueryStageMergeSortInvalidationMutationDedup : public QueryStageMergeSortTestBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ Database* db = ctx.db();
+ Collection* coll = db->getCollection(ns());
+ if (!coll) {
+ WriteUnitOfWork wuow(&_txn);
+ coll = db->createCollection(&_txn, ns());
+ wuow.commit();
+ }
+
+ // Insert data.
+ insert(BSON("_id" << 4 << "a" << 4));
+ insert(BSON("_id" << 5 << "a" << 5));
+ insert(BSON("_id" << 6 << "a" << 6));
+
+ addIndex(BSON("a" << 1));
+
+ std::set<RecordId> rids;
+ getLocs(&rids, coll);
+ set<RecordId>::iterator it = rids.begin();
+
+ WorkingSet ws;
+ WorkingSetMember* member;
+ MergeSortStageParams msparams;
+ msparams.pattern = BSON("a" << 1);
+ auto ms = stdx::make_unique<MergeSortStage>(&_txn, msparams, &ws, coll);
+
+ // First child scans [5, 10].
+ {
+ IndexScanParams params;
+ params.descriptor = getIndex(BSON("a" << 1), coll);
+ params.bounds.isSimpleRange = true;
+ params.bounds.startKey = BSON("" << 5);
+ params.bounds.endKey = BSON("" << 10);
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ auto fetchStage = stdx::make_unique<FetchStage>(
+ &_txn, &ws, new IndexScan(&_txn, params, &ws, nullptr), nullptr, coll);
+ ms->addChild(fetchStage.release());
+ }
+
+ // Second child scans [4, 10].
+ {
+ IndexScanParams params;
+ params.descriptor = getIndex(BSON("a" << 1), coll);
+ params.bounds.isSimpleRange = true;
+ params.bounds.startKey = BSON("" << 4);
+ params.bounds.endKey = BSON("" << 10);
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ auto fetchStage = stdx::make_unique<FetchStage>(
+ &_txn, &ws, new IndexScan(&_txn, params, &ws, nullptr), nullptr, coll);
+ ms->addChild(fetchStage.release());
+ }
+
+ // First doc should be {a: 4}.
+ member = getNextResult(&ws, ms.get());
+ ASSERT_EQ(member->getState(), WorkingSetMember::LOC_AND_OBJ);
+ ASSERT_EQ(member->loc, *it);
+ ASSERT_EQ(member->obj.value(), BSON("_id" << 4 << "a" << 4));
+ ++it;
+
+ // Doc {a: 5} gets invalidated by an update.
+ ms->invalidate(&_txn, *it, INVALIDATION_MUTATION);
+
+ // Invalidated doc {a: 5} should still get returned.
+ member = getNextResult(&ws, ms.get());
+ ASSERT_EQ(member->getState(), WorkingSetMember::OWNED_OBJ);
+ ASSERT_EQ(member->obj.value(), BSON("_id" << 5 << "a" << 5));
+ ++it;
+
+ // We correctly dedup the invalidated doc and return {a: 6} next.
+ member = getNextResult(&ws, ms.get());
+ ASSERT_EQ(member->getState(), WorkingSetMember::LOC_AND_OBJ);
+ ASSERT_EQ(member->loc, *it);
+ ASSERT_EQ(member->obj.value(), BSON("_id" << 6 << "a" << 6));
+ }
+
+private:
+ WorkingSetMember* getNextResult(WorkingSet* ws, PlanStage* stage) {
+ while (!stage->isEOF()) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState status = stage->work(&id);
+ if (PlanStage::ADVANCED != status) {
+ continue;
+ }
+
+ return ws->get(id);
+ }
+
+ FAIL("Expected to produce another result but hit EOF");
+ return nullptr;
+ }
+};
+
class All : public Suite {
public:
All() : Suite("query_stage_merge_sort_test") {}
@@ -652,6 +748,7 @@ public:
add<QueryStageMergeSortOneStageEOF>();
add<QueryStageMergeSortManyShort>();
add<QueryStageMergeSortInvalidation>();
+ add<QueryStageMergeSortInvalidationMutationDedup>();
}
};