summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Korshunov <anton.korshunov@mongodb.com>2021-06-01 12:22:25 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-03 08:02:19 +0000
commit1aa41b5625ae0ac2e757f3638585e20d7d81e7ca (patch)
tree76ca00b164415dc1d3d1243179906bfa50255808
parentdcd9f60e591acffeedabb5fa368f407a035473b2 (diff)
downloadmongo-1aa41b5625ae0ac2e757f3638585e20d7d81e7ca.tar.gz
SERVER-57317 Fix optimized oplog scans in SBE to correctly handle $_resumeAfter queries
-rw-r--r--jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js152
-rw-r--r--jstests/noPassthroughWithMongod/query_oplogreplay.js20
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp284
3 files changed, 327 insertions, 129 deletions
diff --git a/jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js b/jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js
new file mode 100644
index 00000000000..e62fa2ff094
--- /dev/null
+++ b/jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js
@@ -0,0 +1,152 @@
+/**
+ * Test that optimized collection scan on the oplog collection can be applied and executed
+ * successfully.
+ * @tags: [
+ * requires_replication,
+ * ]
+ */
+(function() {
+"use strict";
+
+const rst = new ReplSetTest({nodes: 2});
+rst.startSet();
+rst.initiate();
+
+const db = rst.getPrimary().getDB("oplog_scan_optimizations");
+const localDb = rst.getPrimary().getDB("local");
+const collName = "oplog_scan_optimizations";
+const oplogCollName = "oplog.rs";
+const coll = db[collName];
+const oplogColl = localDb[oplogCollName];
+
+coll.drop();
+
+// Insert several document so that we can use a cursor to fetch them in multiple batches.
+const testData = [{_id: 0, a: 1}, {_id: 1, a: 2}, {_id: 2, a: 3}];
+assert.commandWorked(coll.insert(testData));
+
+// Run the initial query and request to return a resume token. We will also request to return a
+// recordId for the document as it will
+// be used as a $_resumeAfter token later.
+let res = assert.commandWorked(localDb.runCommand({
+ find: oplogCollName,
+ filter: {op: "i", "o.a": {$gte: 1}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ showRecordId: true,
+ $_requestResumeToken: true
+}));
+assert.eq(1, res.cursor.firstBatch.length);
+assert(res.cursor.firstBatch[0].hasOwnProperty("$recordId"));
+assert.hasFields(res.cursor, ["postBatchResumeToken"]);
+
+const firstResumeToken = res.cursor.postBatchResumeToken;
+const firstOplogBatch = res.cursor.firstBatch;
+
+res = assert.commandWorked(
+ localDb.runCommand({getMore: res.cursor.id, collection: oplogCollName, batchSize: 1}));
+assert.eq(1, res.cursor.nextBatch.length);
+assert(res.cursor.nextBatch[0].hasOwnProperty("$recordId"));
+assert.hasFields(res.cursor, ["postBatchResumeToken"]);
+
+const secondResumeToken = res.cursor.postBatchResumeToken;
+const secondOplogBatch = res.cursor.nextBatch;
+
+// Kill the cursor before attempting to resume.
+assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]}));
+
+// Try to resume the query from the saved resume token. This fails because '$_resumeAfter' expects a
+// '$recordId' resume token, not a 'ts' resume token. This is appropriate since resuming on the
+// oplog should occur by timestamp, not by record id.
+assert.commandFailedWithCode(localDb.runCommand({
+ find: oplogCollName,
+ filter: {ts: {$gte: firstResumeToken.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true,
+ $_resumeAfter: firstResumeToken
+}),
+ ErrorCodes.BadValue);
+
+// Now try to resume using a '$recordId' resume token, which doesn't make a ton of sense, but is
+// still allowed.
+res = assert.commandWorked(localDb.runCommand({
+ find: oplogCollName,
+ filter: {ts: {$gte: firstResumeToken.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ showRecordId: true,
+ $_requestResumeToken: true,
+ $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId}
+}));
+assert.eq(1, res.cursor.firstBatch.length);
+assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]);
+
+// Kill the cursor before attempting to resume.
+assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]}));
+
+res = assert.commandWorked(localDb.runCommand({
+ find: oplogCollName,
+ filter: {ts: {$lte: secondResumeToken.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ showRecordId: true,
+ $_requestResumeToken: true,
+ $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId}
+}));
+assert.eq(1, res.cursor.firstBatch.length);
+assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]);
+
+// Kill the cursor before attempting to resume.
+assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]}));
+
+res = assert.commandWorked(localDb.runCommand({
+ find: oplogCollName,
+ filter: {ts: {$eq: secondResumeToken.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ showRecordId: true,
+ $_requestResumeToken: true,
+ $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId}
+}));
+assert.eq(1, res.cursor.firstBatch.length);
+assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]);
+
+// Kill the cursor before attempting to resume.
+assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]}));
+
+res = assert.commandWorked(localDb.runCommand({
+ find: oplogCollName,
+ filter: {ts: {$gt: firstResumeToken.ts, $lte: secondResumeToken.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ showRecordId: true,
+ $_requestResumeToken: true,
+ $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId}
+}));
+assert.eq(1, res.cursor.firstBatch.length);
+assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]);
+
+// Kill the cursor before attempting to resume.
+assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]}));
+
+// Try to resume the query from a non-existent recordId and check that it fails to position the
+// cursor to the record specified in the resume token.
+assert.commandFailedWithCode(localDb.runCommand({
+ find: oplogCollName,
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true,
+ $_resumeAfter: {$recordId: NumberLong("50")}
+}),
+ ErrorCodes.KeyNotFound);
+
+// When we have a predicate on a 'ts' field of the oplog collection, we can build an optimized
+// collection scan. Make sure we can run such optimized scans and get the result.
+assert.eq(oplogColl.find({ts: {$gte: firstResumeToken.ts}}).itcount(), 3);
+assert.eq(oplogColl.find({ts: {$gt: firstResumeToken.ts}}).itcount(), 2);
+assert.gt(oplogColl.find({ts: {$lte: firstResumeToken.ts}}).itcount(), 1);
+assert.gt(oplogColl.find({ts: {$lt: firstResumeToken.ts}}).itcount(), 1);
+
+rst.stopSet();
+})();
diff --git a/jstests/noPassthroughWithMongod/query_oplogreplay.js b/jstests/noPassthroughWithMongod/query_oplogreplay.js
index 557de659681..e011d5f0791 100644
--- a/jstests/noPassthroughWithMongod/query_oplogreplay.js
+++ b/jstests/noPassthroughWithMongod/query_oplogreplay.js
@@ -105,7 +105,7 @@ assert(!cursor.hasNext());
let res = t.find({ts: {$eq: makeTS(10)}}).explain("executionStats");
assert.commandWorked(res);
// We expect to be able to seek directly to the entry with a 'ts' of 10.
-assert.lte(res.executionStats.totalDocsExamined, 2, tojson(res));
+assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 3 : 2, tojson(res));
let collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN");
assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res));
assert.eq(makeTS(10), longToTs(collScanStage.maxRecord), tojson(res));
@@ -113,7 +113,7 @@ assert.eq(makeTS(10), longToTs(collScanStage.maxRecord), tojson(res));
// An AND with an $lt predicate stops scanning after passing the max timestamp.
res = t.find({$and: [{ts: {$gte: makeTS(1)}}, {ts: {$lt: makeTS(10)}}]}).explain("executionStats");
assert.commandWorked(res);
-assert.lte(res.executionStats.totalDocsExamined, 11, tojson(res));
+assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 12 : 11, tojson(res));
collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN");
assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res));
assert.eq(makeTS(10), longToTs(collScanStage.maxRecord), tojson(res));
@@ -147,7 +147,7 @@ res = t.find({
}).explain("executionStats");
assert.commandWorked(res);
// We expect to be able to seek directly to the entry with a 'ts' of 5.
-assert.lte(res.executionStats.totalDocsExamined, 2, tojson(res));
+assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 3 : 2, tojson(res));
collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN");
assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res));
assert.eq(makeTS(5), longToTs(collScanStage.maxRecord), tojson(res));
@@ -157,7 +157,7 @@ assert.eq(makeTS(5), longToTs(collScanStage.minRecord), tojson(res));
res = t.find({ts: {$eq: makeTS(200)}}).explain("executionStats");
assert.commandWorked(res);
// We expect to be able to seek directly to the end of the oplog.
-assert.lte(res.executionStats.totalDocsExamined, 1, tojson(res));
+assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 2 : 1, tojson(res));
collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN");
assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res));
assert.eq(makeTS(200), longToTs(collScanStage.maxRecord), tojson(res));
@@ -169,7 +169,7 @@ res = t.find({
}).explain("executionStats");
assert.commandWorked(res);
// We expect to be able to seek directly to the start of the 'ts' range.
-assert.lte(res.executionStats.totalDocsExamined, 6, tojson(res));
+assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 7 : 6, tojson(res));
collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN");
assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res));
assert.eq(makeTS(8), longToTs(collScanStage.maxRecord), tojson(res));
@@ -202,18 +202,16 @@ while (res.hasNext()) {
}
res = res.explain("executionStats");
assert.commandWorked(res);
-// In SBE we perform an extra seek to position the cursor and apply the filter, so we will report
-// an extra document examined.
-assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 12 : 11, res);
+assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 13 : 11, res);
// Oplog replay optimization should work with limit.
res = t.find({$and: [{ts: {$gte: makeTS(4)}}, {ts: {$lte: makeTS(8)}}]})
.limit(2)
.explain("executionStats");
assert.commandWorked(res);
-assert.eq(2, res.executionStats.totalDocsExamined);
-collScanStage =
- getPlanStage(res.executionStats.executionStages, isSBEEnabled ? "seek" : "COLLSCAN");
+assert.eq(isSBEEnabled ? 3 : 2, res.executionStats.totalDocsExamined);
+collScanStage = isSBEEnabled ? getPlanStages(res.executionStats.executionStages, "seek")[1]
+ : getPlanStage(res.executionStats.executionStages, "COLLSCAN");
assert.eq(2, collScanStage.nReturned, res);
// A query over both 'ts' and '_id' should only pay attention to the 'ts' field for finding
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index 63c1a6aa697..bd045c33801 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -108,19 +108,139 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const CollectionPtr& collection,
return {};
}
+// If the scan should be started after the provided resume RecordId, we will construct a nested-loop
+// join sub-tree to project out the resume RecordId and feed it into the inner side (scan). We will
+// also construct a union sub-tree as an outer side of the loop join to implement the check that the
+// record we're trying to reposition the scan exists.
+//
+// nlj [] [seekRecordIdSlot]
+// left
+// limit 1
+// union [seekRecordIdSlot]
+// [seekSlot]
+// nlj
+// left
+// project seekSlot = <seekRecordIdExpression>
+// limit 1
+// coscan
+// right
+// seek seekSlot ...
+// [unusedSlot]
+// project unusedSlot = efail(KeyNotFound)
+// limit 1
+// coscan
+// right
+// skip 1
+// <inputStage>
+std::unique_ptr<sbe::PlanStage> buildResumeFromRecordIdSubtree(
+ StageBuilderState& state,
+ const CollectionPtr& collection,
+ const CollectionScanNode* csn,
+ std::unique_ptr<sbe::PlanStage> inputStage,
+ sbe::value::SlotId seekRecordIdSlot,
+ std::unique_ptr<sbe::EExpression> seekRecordIdExpression,
+ PlanYieldPolicy* yieldPolicy,
+ bool isTailableResumeBranch,
+ bool resumeAfterRecordId,
+ sbe::LockAcquisitionCallback lockAcquisitionCallback) {
+ invariant(seekRecordIdExpression);
+
+ const auto forward = csn->direction == CollectionScanParams::FORWARD;
+ // Project out the RecordId we want to resume from as 'seekSlot'.
+ auto seekSlot = state.slotId();
+ auto projStage = sbe::makeProjectStage(
+ sbe::makeS<sbe::LimitSkipStage>(
+ sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
+ csn->nodeId(),
+ seekSlot,
+ std::move(seekRecordIdExpression));
+
+ // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
+ // the branch will output the 'seekSlot' to start the real scan from, otherwise it will
+ // produce EOF.
+ auto seekBranch =
+ sbe::makeS<sbe::LoopJoinStage>(std::move(projStage),
+ sbe::makeS<sbe::ScanStage>(collection->uuid(),
+ boost::none /* recordSlot */,
+ boost::none /* recordIdSlot*/,
+ boost::none /* snapshotIdSlot */,
+ boost::none /* indexIdSlot */,
+ boost::none /* indexKeySlot */,
+ boost::none /* keyPatternSlot */,
+ boost::none /* oplogTsSlot */,
+ std::vector<std::string>{},
+ sbe::makeSV(),
+ seekSlot,
+ forward,
+ yieldPolicy,
+ csn->nodeId(),
+ lockAcquisitionCallback),
+ sbe::makeSV(seekSlot),
+ sbe::makeSV(seekSlot),
+ nullptr,
+ csn->nodeId());
+
+ // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch must
+ // have the same number of slots, and we use just one in the 'seek' branch above. This branch
+ // will only be executed if the 'seek' branch produces EOF, which can only happen if the seek
+ // did not find the resume record of a tailable cursor or the record id specified in
+ // $_resumeAfter.
+ auto unusedSlot = state.slotId();
+ auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> {
+ if (isTailableResumeBranch) {
+ return {ErrorCodes::CappedPositionLost,
+ "CollectionScan died due to failure to restore tailable cursor position."};
+ }
+ return {ErrorCodes::ErrorCodes::KeyNotFound,
+ str::stream() << "Failed to resume collection scan the recordId from which we are "
+ "attempting to resume no longer exists in the collection: "
+ << csn->resumeAfterRecordId};
+ }();
+ auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()),
+ csn->nodeId(),
+ unusedSlot,
+ sbe::makeE<sbe::EFail>(errorCode, errorMessage));
+
+ // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will ever
+ // produce a single call to getNext() due to a 'limit 1' sitting on top of it.
+ auto unionStage = sbe::makeS<sbe::UnionStage>(
+ makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), std::move(failBranch)),
+ std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)},
+ sbe::makeSV(seekRecordIdSlot),
+ csn->nodeId());
+
+ // Construct the final loop join. Note that for the resume branch of a tailable cursor case we
+ // use the 'seek' stage as an inner branch, since we need to produce all records starting from
+ // the supplied position. For a resume token case we also inject a 'skip 1' stage on top of the
+ // inner branch, as we need to start _after_ the resume RecordId. In both cases we inject a
+ // 'limit 1' stage on top of the outer branch, as it should produce just a single seek recordId.
+ auto innerStage = isTailableResumeBranch || !resumeAfterRecordId
+ ? std::move(inputStage)
+ : sbe::makeS<sbe::LimitSkipStage>(std::move(inputStage), boost::none, 1, csn->nodeId());
+ return sbe::makeS<sbe::LoopJoinStage>(
+ sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()),
+ std::move(innerStage),
+ sbe::makeSV(),
+ sbe::makeSV(seekRecordIdSlot),
+ nullptr,
+ csn->nodeId());
+}
+
/**
* Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan
- * when there is a predicted on the 'ts' field of the oplog collection.
+ * when any of the following scenarios apply:
*
- * 1. If a lower bound on 'ts' is present, the collection scan will seek directly to the RecordId
- * of an oplog entry as close to this lower bound as possible without going higher.
- * 1.1 If the query is just a lower bound on 'ts' on a forward scan, every document in the
- * collection after the first matching one must also match. To avoid wasting time
- * running the filter on every document to be returned, we will stop applying the filter
- * once it finds the first match.
- * 2. If an upper bound on 'ts' is present, the collection scan will stop and return EOF the first
- * time it fetches a document that does not pass the filter and has 'ts' greater than the upper
- * bound.
+ * 1. There is a predicted on the 'ts' field of the oplog collection.
+ * 1.1 If a lower bound on 'ts' is present, the collection scan will seek directly to the
+ * RecordId of an oplog entry as close to this lower bound as possible without going higher.
+ * 1.2 If the query is *only* a lower bound on 'ts' on a forward scan, every document in the
+ * collection after the first matching one must also match. To avoid wasting time running the
+ * filter on every document to be returned, we will stop applying the filter once it finds
+ * the first match.
+ * 1.3 If an upper bound on 'ts' is present, the collection scan will stop and return EOF the
+ * first time it fetches a document that does not pass the filter and has 'ts' greater than
+ * the upper bound.
+ * 2. The user request specified a $_resumeAfter recordId from which to begin the scan.
*/
std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplogScan(
StageBuilderState& state,
@@ -130,9 +250,11 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
bool isTailableResumeBranch,
sbe::LockAcquisitionCallback lockAcquisitionCallback) {
invariant(collection->ns().isOplog());
- // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId and can
- // only be done for a forward scan.
- invariant(!csn->resumeAfterRecordId);
+ // We can apply oplog scan optimizations only when at least one of the following was specified.
+ invariant(csn->resumeAfterRecordId || csn->minRecord || csn->maxRecord);
+ // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId.
+ invariant(!(csn->resumeAfterRecordId && (csn->minRecord || csn->maxRecord)));
+ // Oplog scan optimizations can only be done for a forward scan.
invariant(csn->direction == CollectionScanParams::FORWARD);
auto resultSlot = state.slotId();
@@ -142,17 +264,22 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// Otherwise, if we're building a collection scan for a resume branch of a special union
// sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly
// to access the recordId to resume the scan from.
- auto [seekRecordId, seekRecordIdSlot] =
- [&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> {
+ auto [seekRecordIdSlot, seekRecordIdExpression] =
+ [&]() -> std::pair<boost::optional<sbe::value::SlotId>, std::unique_ptr<sbe::EExpression>> {
if (isTailableResumeBranch) {
auto resumeRecordIdSlot = state.env->getSlot("resumeRecordId"_sd);
- return {{}, resumeRecordIdSlot};
+ return {resumeRecordIdSlot, makeVariable(resumeRecordIdSlot)};
+ } else if (csn->resumeAfterRecordId) {
+ return {
+ state.slotId(),
+ makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->getLong())};
} else if (csn->minRecord) {
auto cursor = collection->getRecordStore()->getCursor(state.opCtx);
auto startRec = cursor->seekNear(*csn->minRecord);
if (startRec) {
LOGV2_DEBUG(205841, 3, "Using direct oplog seek");
- return {startRec->id, state.slotId()};
+ return {state.slotId(),
+ makeConstant(sbe::value::TypeTags::RecordId, startRec->id.getLong())};
}
}
return {};
@@ -185,22 +312,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
std::move(callbacks));
// Start the scan from the seekRecordId.
- if (seekRecordId) {
- invariant(seekRecordIdSlot);
-
- // Project the start RecordId as a seekRecordIdSlot and feed it to the inner side (scan).
- stage = sbe::makeS<sbe::LoopJoinStage>(
- sbe::makeProjectStage(
- sbe::makeS<sbe::LimitSkipStage>(
- sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
- csn->nodeId(),
- *seekRecordIdSlot,
- makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->getLong())),
- std::move(stage),
- sbe::makeSV(),
- sbe::makeSV(*seekRecordIdSlot),
- nullptr,
- csn->nodeId());
+ if (seekRecordIdSlot) {
+ stage = buildResumeFromRecordIdSubtree(state,
+ collection,
+ csn,
+ std::move(stage),
+ *seekRecordIdSlot,
+ std::move(seekRecordIdExpression),
+ yieldPolicy,
+ isTailableResumeBranch,
+ csn->resumeAfterRecordId.has_value(),
+ lockAcquisitionCallback);
}
// Create a filter which checks the first document to ensure either that its 'ts' is less than
@@ -363,6 +485,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// inner branch, and the execution will continue from this point further on, without
// applying the filter.
if (csn->stopApplyingFilterAfterFirstMatch) {
+ invariant(!csn->maxRecord);
invariant(csn->direction == CollectionScanParams::FORWARD);
seekRecordIdSlot = recordIdSlot;
@@ -464,92 +587,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
csn->nodeId(),
std::move(callbacks));
- // Check if the scan should be started after the provided resume RecordId and construct a nested
- // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to
- // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop
- // join to implement the check that the record we're trying to reposition the scan exists.
if (seekRecordIdSlot) {
- // Project out the RecordId we want to resume from as 'seekSlot'.
- auto seekSlot = state.slotId();
- auto projStage = sbe::makeProjectStage(
- sbe::makeS<sbe::LimitSkipStage>(
- sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
- csn->nodeId(),
- seekSlot,
- std::move(seekRecordIdExpression));
-
- // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
- // the branch will output the 'seekSlot' to start the real scan from, otherwise it will
- // produce EOF.
- auto seekBranch = sbe::makeS<sbe::LoopJoinStage>(
- std::move(projStage),
- sbe::makeS<sbe::ScanStage>(collection->uuid(),
- boost::none /* recordSlot */,
- boost::none /* recordIdSlot*/,
- boost::none /* snapshotIdSlot */,
- boost::none /* indexIdSlot */,
- boost::none /* indexKeySlot */,
- boost::none /* keyPatternSlot */,
- boost::none /* oplogTsSlot */,
- std::vector<std::string>{},
- sbe::makeSV(),
- seekSlot,
- forward,
- yieldPolicy,
- csn->nodeId(),
- lockAcquisitionCallback),
- sbe::makeSV(seekSlot),
- sbe::makeSV(seekSlot),
- nullptr,
- csn->nodeId());
-
- // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch
- // must have the same number of slots, and we use just one in the 'seek' branch above. This
- // branch will only be executed if the 'seek' branch produces EOF, which can only happen if
- // the seek did not find the resume record of a tailable cursor or the record id specified
- // in $_resumeAfter.
- auto unusedSlot = state.slotId();
- auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> {
- if (isTailableResumeBranch) {
- return {ErrorCodes::CappedPositionLost,
- "CollectionScan died due to failure to restore tailable cursor position."};
- }
- return {
- ErrorCodes::ErrorCodes::KeyNotFound,
- str::stream() << "Failed to resume collection scan the recordId from which we are "
- "attempting to resume no longer exists in the collection: "
- << csn->resumeAfterRecordId};
- }();
- auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()),
- csn->nodeId(),
- unusedSlot,
- sbe::makeE<sbe::EFail>(errorCode, errorMessage));
-
- // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will
- // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it.
- auto unionStage = sbe::makeS<sbe::UnionStage>(
- makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch),
- std::move(failBranch)),
- std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)},
- sbe::makeSV(*seekRecordIdSlot),
- csn->nodeId());
-
- // Construct the final loop join. Note that for the resume branch of a tailable cursor case
- // we use the 'seek' stage as an inner branch, since we need to produce all records starting
- // from the supplied position. For a resume token case we also inject a 'skip 1' stage on
- // top of the inner branch, as we need to start _after_ the resume RecordId. In both cases
- // we inject a 'limit 1' stage on top of the outer branch, as it should produce just a
- // single seek recordId.
- auto innerStage = isTailableResumeBranch
- ? std::move(stage)
- : sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1, csn->nodeId());
- stage = sbe::makeS<sbe::LoopJoinStage>(
- sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()),
- std::move(innerStage),
- sbe::makeSV(),
- sbe::makeSV(*seekRecordIdSlot),
- nullptr,
- csn->nodeId());
+ stage = buildResumeFromRecordIdSubtree(state,
+ collection,
+ csn,
+ std::move(stage),
+ *seekRecordIdSlot,
+ std::move(seekRecordIdExpression),
+ yieldPolicy,
+ isTailableResumeBranch,
+ true, /* resumeAfterRecordId */
+ lockAcquisitionCallback);
}
if (csn->filter) {