diff options
author | Anton Korshunov <anton.korshunov@mongodb.com> | 2021-06-01 12:22:25 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-03 08:02:19 +0000 |
commit | 1aa41b5625ae0ac2e757f3638585e20d7d81e7ca (patch) | |
tree | 76ca00b164415dc1d3d1243179906bfa50255808 | |
parent | dcd9f60e591acffeedabb5fa368f407a035473b2 (diff) | |
download | mongo-1aa41b5625ae0ac2e757f3638585e20d7d81e7ca.tar.gz |
SERVER-57317 Fix optimized oplog scans in SBE to correctly handle $_resumeAfter queries
3 files changed, 327 insertions, 129 deletions
diff --git a/jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js b/jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js new file mode 100644 index 00000000000..e62fa2ff094 --- /dev/null +++ b/jstests/noPassthrough/oplog_scan_optimizations_do_not_conflict.js @@ -0,0 +1,152 @@ +/** + * Test that optimized collection scan on the oplog collection can be applied and executed + * successfully. + * @tags: [ + * requires_replication, + * ] + */ +(function() { +"use strict"; + +const rst = new ReplSetTest({nodes: 2}); +rst.startSet(); +rst.initiate(); + +const db = rst.getPrimary().getDB("oplog_scan_optimizations"); +const localDb = rst.getPrimary().getDB("local"); +const collName = "oplog_scan_optimizations"; +const oplogCollName = "oplog.rs"; +const coll = db[collName]; +const oplogColl = localDb[oplogCollName]; + +coll.drop(); + +// Insert several document so that we can use a cursor to fetch them in multiple batches. +const testData = [{_id: 0, a: 1}, {_id: 1, a: 2}, {_id: 2, a: 3}]; +assert.commandWorked(coll.insert(testData)); + +// Run the initial query and request to return a resume token. We will also request to return a +// recordId for the document as it will +// be used as a $_resumeAfter token later. +let res = assert.commandWorked(localDb.runCommand({ + find: oplogCollName, + filter: {op: "i", "o.a": {$gte: 1}}, + hint: {$natural: 1}, + batchSize: 1, + showRecordId: true, + $_requestResumeToken: true +})); +assert.eq(1, res.cursor.firstBatch.length); +assert(res.cursor.firstBatch[0].hasOwnProperty("$recordId")); +assert.hasFields(res.cursor, ["postBatchResumeToken"]); + +const firstResumeToken = res.cursor.postBatchResumeToken; +const firstOplogBatch = res.cursor.firstBatch; + +res = assert.commandWorked( + localDb.runCommand({getMore: res.cursor.id, collection: oplogCollName, batchSize: 1})); +assert.eq(1, res.cursor.nextBatch.length); +assert(res.cursor.nextBatch[0].hasOwnProperty("$recordId")); +assert.hasFields(res.cursor, ["postBatchResumeToken"]); + +const secondResumeToken = res.cursor.postBatchResumeToken; +const secondOplogBatch = res.cursor.nextBatch; + +// Kill the cursor before attempting to resume. +assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]})); + +// Try to resume the query from the saved resume token. This fails because '$_resumeAfter' expects a +// '$recordId' resume token, not a 'ts' resume token. This is appropriate since resuming on the +// oplog should occur by timestamp, not by record id. +assert.commandFailedWithCode(localDb.runCommand({ + find: oplogCollName, + filter: {ts: {$gte: firstResumeToken.ts}}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: firstResumeToken +}), + ErrorCodes.BadValue); + +// Now try to resume using a '$recordId' resume token, which doesn't make a ton of sense, but is +// still allowed. +res = assert.commandWorked(localDb.runCommand({ + find: oplogCollName, + filter: {ts: {$gte: firstResumeToken.ts}}, + hint: {$natural: 1}, + batchSize: 1, + showRecordId: true, + $_requestResumeToken: true, + $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId} +})); +assert.eq(1, res.cursor.firstBatch.length); +assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]); + +// Kill the cursor before attempting to resume. +assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]})); + +res = assert.commandWorked(localDb.runCommand({ + find: oplogCollName, + filter: {ts: {$lte: secondResumeToken.ts}}, + hint: {$natural: 1}, + batchSize: 1, + showRecordId: true, + $_requestResumeToken: true, + $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId} +})); +assert.eq(1, res.cursor.firstBatch.length); +assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]); + +// Kill the cursor before attempting to resume. +assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]})); + +res = assert.commandWorked(localDb.runCommand({ + find: oplogCollName, + filter: {ts: {$eq: secondResumeToken.ts}}, + hint: {$natural: 1}, + batchSize: 1, + showRecordId: true, + $_requestResumeToken: true, + $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId} +})); +assert.eq(1, res.cursor.firstBatch.length); +assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]); + +// Kill the cursor before attempting to resume. +assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]})); + +res = assert.commandWorked(localDb.runCommand({ + find: oplogCollName, + filter: {ts: {$gt: firstResumeToken.ts, $lte: secondResumeToken.ts}}, + hint: {$natural: 1}, + batchSize: 1, + showRecordId: true, + $_requestResumeToken: true, + $_resumeAfter: {$recordId: firstOplogBatch[0].$recordId} +})); +assert.eq(1, res.cursor.firstBatch.length); +assert.eq(secondOplogBatch[0], res.cursor.firstBatch[0]); + +// Kill the cursor before attempting to resume. +assert.commandWorked(localDb.runCommand({killCursors: oplogCollName, cursors: [res.cursor.id]})); + +// Try to resume the query from a non-existent recordId and check that it fails to position the +// cursor to the record specified in the resume token. +assert.commandFailedWithCode(localDb.runCommand({ + find: oplogCollName, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: {$recordId: NumberLong("50")} +}), + ErrorCodes.KeyNotFound); + +// When we have a predicate on a 'ts' field of the oplog collection, we can build an optimized +// collection scan. Make sure we can run such optimized scans and get the result. +assert.eq(oplogColl.find({ts: {$gte: firstResumeToken.ts}}).itcount(), 3); +assert.eq(oplogColl.find({ts: {$gt: firstResumeToken.ts}}).itcount(), 2); +assert.gt(oplogColl.find({ts: {$lte: firstResumeToken.ts}}).itcount(), 1); +assert.gt(oplogColl.find({ts: {$lt: firstResumeToken.ts}}).itcount(), 1); + +rst.stopSet(); +})(); diff --git a/jstests/noPassthroughWithMongod/query_oplogreplay.js b/jstests/noPassthroughWithMongod/query_oplogreplay.js index 557de659681..e011d5f0791 100644 --- a/jstests/noPassthroughWithMongod/query_oplogreplay.js +++ b/jstests/noPassthroughWithMongod/query_oplogreplay.js @@ -105,7 +105,7 @@ assert(!cursor.hasNext()); let res = t.find({ts: {$eq: makeTS(10)}}).explain("executionStats"); assert.commandWorked(res); // We expect to be able to seek directly to the entry with a 'ts' of 10. -assert.lte(res.executionStats.totalDocsExamined, 2, tojson(res)); +assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 3 : 2, tojson(res)); let collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN"); assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res)); assert.eq(makeTS(10), longToTs(collScanStage.maxRecord), tojson(res)); @@ -113,7 +113,7 @@ assert.eq(makeTS(10), longToTs(collScanStage.maxRecord), tojson(res)); // An AND with an $lt predicate stops scanning after passing the max timestamp. res = t.find({$and: [{ts: {$gte: makeTS(1)}}, {ts: {$lt: makeTS(10)}}]}).explain("executionStats"); assert.commandWorked(res); -assert.lte(res.executionStats.totalDocsExamined, 11, tojson(res)); +assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 12 : 11, tojson(res)); collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN"); assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res)); assert.eq(makeTS(10), longToTs(collScanStage.maxRecord), tojson(res)); @@ -147,7 +147,7 @@ res = t.find({ }).explain("executionStats"); assert.commandWorked(res); // We expect to be able to seek directly to the entry with a 'ts' of 5. -assert.lte(res.executionStats.totalDocsExamined, 2, tojson(res)); +assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 3 : 2, tojson(res)); collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN"); assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res)); assert.eq(makeTS(5), longToTs(collScanStage.maxRecord), tojson(res)); @@ -157,7 +157,7 @@ assert.eq(makeTS(5), longToTs(collScanStage.minRecord), tojson(res)); res = t.find({ts: {$eq: makeTS(200)}}).explain("executionStats"); assert.commandWorked(res); // We expect to be able to seek directly to the end of the oplog. -assert.lte(res.executionStats.totalDocsExamined, 1, tojson(res)); +assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 2 : 1, tojson(res)); collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN"); assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res)); assert.eq(makeTS(200), longToTs(collScanStage.maxRecord), tojson(res)); @@ -169,7 +169,7 @@ res = t.find({ }).explain("executionStats"); assert.commandWorked(res); // We expect to be able to seek directly to the start of the 'ts' range. -assert.lte(res.executionStats.totalDocsExamined, 6, tojson(res)); +assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 7 : 6, tojson(res)); collScanStage = getPlanStage(getWinningPlan(res.queryPlanner), "COLLSCAN"); assert.neq(null, collScanStage, "no collection scan found in explain output: " + tojson(res)); assert.eq(makeTS(8), longToTs(collScanStage.maxRecord), tojson(res)); @@ -202,18 +202,16 @@ while (res.hasNext()) { } res = res.explain("executionStats"); assert.commandWorked(res); -// In SBE we perform an extra seek to position the cursor and apply the filter, so we will report -// an extra document examined. -assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 12 : 11, res); +assert.lte(res.executionStats.totalDocsExamined, isSBEEnabled ? 13 : 11, res); // Oplog replay optimization should work with limit. res = t.find({$and: [{ts: {$gte: makeTS(4)}}, {ts: {$lte: makeTS(8)}}]}) .limit(2) .explain("executionStats"); assert.commandWorked(res); -assert.eq(2, res.executionStats.totalDocsExamined); -collScanStage = - getPlanStage(res.executionStats.executionStages, isSBEEnabled ? "seek" : "COLLSCAN"); +assert.eq(isSBEEnabled ? 3 : 2, res.executionStats.totalDocsExamined); +collScanStage = isSBEEnabled ? getPlanStages(res.executionStats.executionStages, "seek")[1] + : getPlanStage(res.executionStats.executionStages, "COLLSCAN"); assert.eq(2, collScanStage.nReturned, res); // A query over both 'ts' and '_id' should only pay attention to the 'ts' field for finding diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index 63c1a6aa697..bd045c33801 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -108,19 +108,139 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const CollectionPtr& collection, return {}; } +// If the scan should be started after the provided resume RecordId, we will construct a nested-loop +// join sub-tree to project out the resume RecordId and feed it into the inner side (scan). We will +// also construct a union sub-tree as an outer side of the loop join to implement the check that the +// record we're trying to reposition the scan exists. +// +// nlj [] [seekRecordIdSlot] +// left +// limit 1 +// union [seekRecordIdSlot] +// [seekSlot] +// nlj +// left +// project seekSlot = <seekRecordIdExpression> +// limit 1 +// coscan +// right +// seek seekSlot ... +// [unusedSlot] +// project unusedSlot = efail(KeyNotFound) +// limit 1 +// coscan +// right +// skip 1 +// <inputStage> +std::unique_ptr<sbe::PlanStage> buildResumeFromRecordIdSubtree( + StageBuilderState& state, + const CollectionPtr& collection, + const CollectionScanNode* csn, + std::unique_ptr<sbe::PlanStage> inputStage, + sbe::value::SlotId seekRecordIdSlot, + std::unique_ptr<sbe::EExpression> seekRecordIdExpression, + PlanYieldPolicy* yieldPolicy, + bool isTailableResumeBranch, + bool resumeAfterRecordId, + sbe::LockAcquisitionCallback lockAcquisitionCallback) { + invariant(seekRecordIdExpression); + + const auto forward = csn->direction == CollectionScanParams::FORWARD; + // Project out the RecordId we want to resume from as 'seekSlot'. + auto seekSlot = state.slotId(); + auto projStage = sbe::makeProjectStage( + sbe::makeS<sbe::LimitSkipStage>( + sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), + csn->nodeId(), + seekSlot, + std::move(seekRecordIdExpression)); + + // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, + // the branch will output the 'seekSlot' to start the real scan from, otherwise it will + // produce EOF. + auto seekBranch = + sbe::makeS<sbe::LoopJoinStage>(std::move(projStage), + sbe::makeS<sbe::ScanStage>(collection->uuid(), + boost::none /* recordSlot */, + boost::none /* recordIdSlot*/, + boost::none /* snapshotIdSlot */, + boost::none /* indexIdSlot */, + boost::none /* indexKeySlot */, + boost::none /* keyPatternSlot */, + boost::none /* oplogTsSlot */, + std::vector<std::string>{}, + sbe::makeSV(), + seekSlot, + forward, + yieldPolicy, + csn->nodeId(), + lockAcquisitionCallback), + sbe::makeSV(seekSlot), + sbe::makeSV(seekSlot), + nullptr, + csn->nodeId()); + + // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch must + // have the same number of slots, and we use just one in the 'seek' branch above. This branch + // will only be executed if the 'seek' branch produces EOF, which can only happen if the seek + // did not find the resume record of a tailable cursor or the record id specified in + // $_resumeAfter. + auto unusedSlot = state.slotId(); + auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> { + if (isTailableResumeBranch) { + return {ErrorCodes::CappedPositionLost, + "CollectionScan died due to failure to restore tailable cursor position."}; + } + return {ErrorCodes::ErrorCodes::KeyNotFound, + str::stream() << "Failed to resume collection scan the recordId from which we are " + "attempting to resume no longer exists in the collection: " + << csn->resumeAfterRecordId}; + }(); + auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()), + csn->nodeId(), + unusedSlot, + sbe::makeE<sbe::EFail>(errorCode, errorMessage)); + + // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will ever + // produce a single call to getNext() due to a 'limit 1' sitting on top of it. + auto unionStage = sbe::makeS<sbe::UnionStage>( + makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), std::move(failBranch)), + std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, + sbe::makeSV(seekRecordIdSlot), + csn->nodeId()); + + // Construct the final loop join. Note that for the resume branch of a tailable cursor case we + // use the 'seek' stage as an inner branch, since we need to produce all records starting from + // the supplied position. For a resume token case we also inject a 'skip 1' stage on top of the + // inner branch, as we need to start _after_ the resume RecordId. In both cases we inject a + // 'limit 1' stage on top of the outer branch, as it should produce just a single seek recordId. + auto innerStage = isTailableResumeBranch || !resumeAfterRecordId + ? std::move(inputStage) + : sbe::makeS<sbe::LimitSkipStage>(std::move(inputStage), boost::none, 1, csn->nodeId()); + return sbe::makeS<sbe::LoopJoinStage>( + sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()), + std::move(innerStage), + sbe::makeSV(), + sbe::makeSV(seekRecordIdSlot), + nullptr, + csn->nodeId()); +} + /** * Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan - * when there is a predicted on the 'ts' field of the oplog collection. + * when any of the following scenarios apply: * - * 1. If a lower bound on 'ts' is present, the collection scan will seek directly to the RecordId - * of an oplog entry as close to this lower bound as possible without going higher. - * 1.1 If the query is just a lower bound on 'ts' on a forward scan, every document in the - * collection after the first matching one must also match. To avoid wasting time - * running the filter on every document to be returned, we will stop applying the filter - * once it finds the first match. - * 2. If an upper bound on 'ts' is present, the collection scan will stop and return EOF the first - * time it fetches a document that does not pass the filter and has 'ts' greater than the upper - * bound. + * 1. There is a predicted on the 'ts' field of the oplog collection. + * 1.1 If a lower bound on 'ts' is present, the collection scan will seek directly to the + * RecordId of an oplog entry as close to this lower bound as possible without going higher. + * 1.2 If the query is *only* a lower bound on 'ts' on a forward scan, every document in the + * collection after the first matching one must also match. To avoid wasting time running the + * filter on every document to be returned, we will stop applying the filter once it finds + * the first match. + * 1.3 If an upper bound on 'ts' is present, the collection scan will stop and return EOF the + * first time it fetches a document that does not pass the filter and has 'ts' greater than + * the upper bound. + * 2. The user request specified a $_resumeAfter recordId from which to begin the scan. */ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplogScan( StageBuilderState& state, @@ -130,9 +250,11 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo bool isTailableResumeBranch, sbe::LockAcquisitionCallback lockAcquisitionCallback) { invariant(collection->ns().isOplog()); - // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId and can - // only be done for a forward scan. - invariant(!csn->resumeAfterRecordId); + // We can apply oplog scan optimizations only when at least one of the following was specified. + invariant(csn->resumeAfterRecordId || csn->minRecord || csn->maxRecord); + // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId. + invariant(!(csn->resumeAfterRecordId && (csn->minRecord || csn->maxRecord))); + // Oplog scan optimizations can only be done for a forward scan. invariant(csn->direction == CollectionScanParams::FORWARD); auto resultSlot = state.slotId(); @@ -142,17 +264,22 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // Otherwise, if we're building a collection scan for a resume branch of a special union // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly // to access the recordId to resume the scan from. - auto [seekRecordId, seekRecordIdSlot] = - [&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> { + auto [seekRecordIdSlot, seekRecordIdExpression] = + [&]() -> std::pair<boost::optional<sbe::value::SlotId>, std::unique_ptr<sbe::EExpression>> { if (isTailableResumeBranch) { auto resumeRecordIdSlot = state.env->getSlot("resumeRecordId"_sd); - return {{}, resumeRecordIdSlot}; + return {resumeRecordIdSlot, makeVariable(resumeRecordIdSlot)}; + } else if (csn->resumeAfterRecordId) { + return { + state.slotId(), + makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->getLong())}; } else if (csn->minRecord) { auto cursor = collection->getRecordStore()->getCursor(state.opCtx); auto startRec = cursor->seekNear(*csn->minRecord); if (startRec) { LOGV2_DEBUG(205841, 3, "Using direct oplog seek"); - return {startRec->id, state.slotId()}; + return {state.slotId(), + makeConstant(sbe::value::TypeTags::RecordId, startRec->id.getLong())}; } } return {}; @@ -185,22 +312,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo std::move(callbacks)); // Start the scan from the seekRecordId. - if (seekRecordId) { - invariant(seekRecordIdSlot); - - // Project the start RecordId as a seekRecordIdSlot and feed it to the inner side (scan). - stage = sbe::makeS<sbe::LoopJoinStage>( - sbe::makeProjectStage( - sbe::makeS<sbe::LimitSkipStage>( - sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), - csn->nodeId(), - *seekRecordIdSlot, - makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->getLong())), - std::move(stage), - sbe::makeSV(), - sbe::makeSV(*seekRecordIdSlot), - nullptr, - csn->nodeId()); + if (seekRecordIdSlot) { + stage = buildResumeFromRecordIdSubtree(state, + collection, + csn, + std::move(stage), + *seekRecordIdSlot, + std::move(seekRecordIdExpression), + yieldPolicy, + isTailableResumeBranch, + csn->resumeAfterRecordId.has_value(), + lockAcquisitionCallback); } // Create a filter which checks the first document to ensure either that its 'ts' is less than @@ -363,6 +485,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // inner branch, and the execution will continue from this point further on, without // applying the filter. if (csn->stopApplyingFilterAfterFirstMatch) { + invariant(!csn->maxRecord); invariant(csn->direction == CollectionScanParams::FORWARD); seekRecordIdSlot = recordIdSlot; @@ -464,92 +587,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc csn->nodeId(), std::move(callbacks)); - // Check if the scan should be started after the provided resume RecordId and construct a nested - // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to - // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop - // join to implement the check that the record we're trying to reposition the scan exists. if (seekRecordIdSlot) { - // Project out the RecordId we want to resume from as 'seekSlot'. - auto seekSlot = state.slotId(); - auto projStage = sbe::makeProjectStage( - sbe::makeS<sbe::LimitSkipStage>( - sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), - csn->nodeId(), - seekSlot, - std::move(seekRecordIdExpression)); - - // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, - // the branch will output the 'seekSlot' to start the real scan from, otherwise it will - // produce EOF. - auto seekBranch = sbe::makeS<sbe::LoopJoinStage>( - std::move(projStage), - sbe::makeS<sbe::ScanStage>(collection->uuid(), - boost::none /* recordSlot */, - boost::none /* recordIdSlot*/, - boost::none /* snapshotIdSlot */, - boost::none /* indexIdSlot */, - boost::none /* indexKeySlot */, - boost::none /* keyPatternSlot */, - boost::none /* oplogTsSlot */, - std::vector<std::string>{}, - sbe::makeSV(), - seekSlot, - forward, - yieldPolicy, - csn->nodeId(), - lockAcquisitionCallback), - sbe::makeSV(seekSlot), - sbe::makeSV(seekSlot), - nullptr, - csn->nodeId()); - - // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch - // must have the same number of slots, and we use just one in the 'seek' branch above. This - // branch will only be executed if the 'seek' branch produces EOF, which can only happen if - // the seek did not find the resume record of a tailable cursor or the record id specified - // in $_resumeAfter. - auto unusedSlot = state.slotId(); - auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> { - if (isTailableResumeBranch) { - return {ErrorCodes::CappedPositionLost, - "CollectionScan died due to failure to restore tailable cursor position."}; - } - return { - ErrorCodes::ErrorCodes::KeyNotFound, - str::stream() << "Failed to resume collection scan the recordId from which we are " - "attempting to resume no longer exists in the collection: " - << csn->resumeAfterRecordId}; - }(); - auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()), - csn->nodeId(), - unusedSlot, - sbe::makeE<sbe::EFail>(errorCode, errorMessage)); - - // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will - // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it. - auto unionStage = sbe::makeS<sbe::UnionStage>( - makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), - std::move(failBranch)), - std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, - sbe::makeSV(*seekRecordIdSlot), - csn->nodeId()); - - // Construct the final loop join. Note that for the resume branch of a tailable cursor case - // we use the 'seek' stage as an inner branch, since we need to produce all records starting - // from the supplied position. For a resume token case we also inject a 'skip 1' stage on - // top of the inner branch, as we need to start _after_ the resume RecordId. In both cases - // we inject a 'limit 1' stage on top of the outer branch, as it should produce just a - // single seek recordId. - auto innerStage = isTailableResumeBranch - ? std::move(stage) - : sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1, csn->nodeId()); - stage = sbe::makeS<sbe::LoopJoinStage>( - sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()), - std::move(innerStage), - sbe::makeSV(), - sbe::makeSV(*seekRecordIdSlot), - nullptr, - csn->nodeId()); + stage = buildResumeFromRecordIdSubtree(state, + collection, + csn, + std::move(stage), + *seekRecordIdSlot, + std::move(seekRecordIdExpression), + yieldPolicy, + isTailableResumeBranch, + true, /* resumeAfterRecordId */ + lockAcquisitionCallback); } if (csn->filter) { |