diff options
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 68 |
1 files changed, 56 insertions, 12 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index 05f9bcefb96..1a338abf238 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -41,6 +41,7 @@ #include "mongo/db/exec/sbe/stages/loop_join.h" #include "mongo/db/exec/sbe/stages/project.h" #include "mongo/db/exec/sbe/stages/scan.h" +#include "mongo/db/exec/sbe/stages/union.h" #include "mongo/db/query/sbe_stage_builder_filter.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/storage/oplog_hack.h" @@ -330,20 +331,63 @@ generateGenericCollScan(const Collection* collection, // Check if the scan should be started after the provided resume RecordId and construct a nested // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to - // the inner side (scan). - // - // Note that we also inject a 'skip 1' stage on top of the inner branch, as we need to start - // _after_ the resume RecordId. - // - // TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on - // seekRecordIdSlot. + // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop + // join to implement the check that the record we're trying to reposition the scan exists. if (seekRecordIdSlot && !isTailableResumeBranch) { + // Project out the RecordId we want to resume from as 'seekSlot'. + auto seekSlot = slotIdGenerator->generate(); + auto projStage = sbe::makeProjectStage( + sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none), + seekSlot, + sbe::makeE<sbe::EConstant>(sbe::value::TypeTags::NumberInt64, + csn->resumeAfterRecordId->repr())); + + // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, + // the branch will output the 'seekSlot' to start the real scan from, otherwise it will + // produce EOF. + auto seekBranch = + sbe::makeS<sbe::LoopJoinStage>(std::move(projStage), + sbe::makeS<sbe::ScanStage>(nss, + boost::none, + boost::none, + std::vector<std::string>{}, + sbe::makeSV(), + seekSlot, + forward, + yieldPolicy, + tracker), + + sbe::makeSV(seekSlot), + sbe::makeSV(seekSlot), + nullptr); + + // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch + // must have the same number of slots, and we use just one in the 'seek' branch above. This + // branch will only be executed if the 'seek' branch produces EOF, which can only happen if + // if the seek did not find the record id specified in $_resumeAfter. + auto unusedSlot = slotIdGenerator->generate(); + auto failBranch = sbe::makeProjectStage( + sbe::makeS<sbe::CoScanStage>(), + unusedSlot, + sbe::makeE<sbe::EFail>( + ErrorCodes::KeyNotFound, + str::stream() << "Failed to resume collection scan: the recordId from which we are " + << "attempting to resume no longer exists in the collection: " + << csn->resumeAfterRecordId)); + + // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will + // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it. + auto unionStage = sbe::makeS<sbe::UnionStage>( + make_vector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), + std::move(failBranch)), + std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, + sbe::makeSV(*seekRecordIdSlot)); + + // Construct the final loop join. Note that we also inject a 'skip 1' stage on top of the + // inner branch, as we need to start _after_ the resume RecordId, and a 'limit 1' stage on + // top of the outer branch, as it should produce just a single seek recordId. stage = sbe::makeS<sbe::LoopJoinStage>( - sbe::makeProjectStage( - sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none), - *seekRecordIdSlot, - sbe::makeE<sbe::EConstant>(sbe::value::TypeTags::NumberInt64, - csn->resumeAfterRecordId->repr())), + sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none), sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1), sbe::makeSV(), sbe::makeSV(*seekRecordIdSlot), |