summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp68
1 files changed, 56 insertions, 12 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index 05f9bcefb96..1a338abf238 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/exec/sbe/stages/loop_join.h"
#include "mongo/db/exec/sbe/stages/project.h"
#include "mongo/db/exec/sbe/stages/scan.h"
+#include "mongo/db/exec/sbe/stages/union.h"
#include "mongo/db/query/sbe_stage_builder_filter.h"
#include "mongo/db/query/util/make_data_structure.h"
#include "mongo/db/storage/oplog_hack.h"
@@ -330,20 +331,63 @@ generateGenericCollScan(const Collection* collection,
// Check if the scan should be started after the provided resume RecordId and construct a nested
// loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to
- // the inner side (scan).
- //
- // Note that we also inject a 'skip 1' stage on top of the inner branch, as we need to start
- // _after_ the resume RecordId.
- //
- // TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on
- // seekRecordIdSlot.
+ // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop
+ // join to implement the check that the record we're trying to reposition the scan exists.
if (seekRecordIdSlot && !isTailableResumeBranch) {
+ // Project out the RecordId we want to resume from as 'seekSlot'.
+ auto seekSlot = slotIdGenerator->generate();
+ auto projStage = sbe::makeProjectStage(
+ sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none),
+ seekSlot,
+ sbe::makeE<sbe::EConstant>(sbe::value::TypeTags::NumberInt64,
+ csn->resumeAfterRecordId->repr()));
+
+ // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
+ // the branch will output the 'seekSlot' to start the real scan from, otherwise it will
+ // produce EOF.
+ auto seekBranch =
+ sbe::makeS<sbe::LoopJoinStage>(std::move(projStage),
+ sbe::makeS<sbe::ScanStage>(nss,
+ boost::none,
+ boost::none,
+ std::vector<std::string>{},
+ sbe::makeSV(),
+ seekSlot,
+ forward,
+ yieldPolicy,
+ tracker),
+
+ sbe::makeSV(seekSlot),
+ sbe::makeSV(seekSlot),
+ nullptr);
+
+ // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch
+ // must have the same number of slots, and we use just one in the 'seek' branch above. This
+ // branch will only be executed if the 'seek' branch produces EOF, which can only happen if
+ // if the seek did not find the record id specified in $_resumeAfter.
+ auto unusedSlot = slotIdGenerator->generate();
+ auto failBranch = sbe::makeProjectStage(
+ sbe::makeS<sbe::CoScanStage>(),
+ unusedSlot,
+ sbe::makeE<sbe::EFail>(
+ ErrorCodes::KeyNotFound,
+ str::stream() << "Failed to resume collection scan: the recordId from which we are "
+ << "attempting to resume no longer exists in the collection: "
+ << csn->resumeAfterRecordId));
+
+ // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will
+ // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it.
+ auto unionStage = sbe::makeS<sbe::UnionStage>(
+ make_vector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch),
+ std::move(failBranch)),
+ std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)},
+ sbe::makeSV(*seekRecordIdSlot));
+
+ // Construct the final loop join. Note that we also inject a 'skip 1' stage on top of the
+ // inner branch, as we need to start _after_ the resume RecordId, and a 'limit 1' stage on
+ // top of the outer branch, as it should produce just a single seek recordId.
stage = sbe::makeS<sbe::LoopJoinStage>(
- sbe::makeProjectStage(
- sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none),
- *seekRecordIdSlot,
- sbe::makeE<sbe::EConstant>(sbe::value::TypeTags::NumberInt64,
- csn->resumeAfterRecordId->repr())),
+ sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none),
sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1),
sbe::makeSV(),
sbe::makeSV(*seekRecordIdSlot),