summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
diff options
context:
space:
mode:
authorMindaugas Malinauskas <mindaugas.malinauskas@mongodb.com>2021-04-21 10:16:32 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-27 14:44:07 +0000
commit37552892e0d3930d7724722c8d3c12961dc605ab (patch)
tree9c67f15b7bb6c250dfa1b5f3e591454faf30a82f /src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
parent3774a333d32ec053145bc9677a0cae3f7bd4522c (diff)
downloadmongo-37552892e0d3930d7724722c8d3c12961dc605ab.tar.gz
SERVER-56111 [SBE] A lost cursor position is not detected for tailable cursors
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp64
1 files changed, 41 insertions, 23 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index 89e80a12769..02b1580a7b3 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -412,9 +412,12 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
}
/**
- * Generates a generic collecion scan sub-tree. If a resume token has been provided, the scan will
- * start from a RecordId contained within this token, otherwise from the beginning of the
- * collection.
+ * Generates a generic collection scan sub-tree.
+ * - If a resume token has been provided, the scan will start from a RecordId contained within this
+ * token.
+ * - Else if 'isTailableResumeBranch' is true, the scan will start from a RecordId contained in
+ * slot "resumeRecordId".
+ * - Otherwise the scan will start from the beginning of the collection.
*/
std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollScan(
OperationContext* opCtx,
@@ -434,13 +437,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
auto resultSlot = slotIdGenerator->generate();
auto recordIdSlot = slotIdGenerator->generate();
- auto seekRecordIdSlot = [&]() -> boost::optional<sbe::value::SlotId> {
+ auto [seekRecordIdSlot, seekRecordIdExpression] =
+ [&]() -> std::pair<boost::optional<sbe::value::SlotId>, std::unique_ptr<sbe::EExpression>> {
if (csn->resumeAfterRecordId) {
- return slotIdGenerator->generate();
+ return {
+ slotIdGenerator->generate(),
+ makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->getLong())};
} else if (isTailableResumeBranch) {
auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd);
- invariant(resumeRecordIdSlot);
- return resumeRecordIdSlot;
+ return {resumeRecordIdSlot, makeVariable(resumeRecordIdSlot)};
}
return {};
}();
@@ -470,7 +475,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
// loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to
// the inner side (scan). We will also construct a union sub-tree as an outer side of the loop
// join to implement the check that the record we're trying to reposition the scan exists.
- if (seekRecordIdSlot && !isTailableResumeBranch) {
+ if (seekRecordIdSlot) {
// Project out the RecordId we want to resume from as 'seekSlot'.
auto seekSlot = slotIdGenerator->generate();
auto projStage = sbe::makeProjectStage(
@@ -478,7 +483,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
csn->nodeId(),
seekSlot,
- makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->getLong()));
+ std::move(seekRecordIdExpression));
// Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
// the branch will output the 'seekSlot' to start the real scan from, otherwise it will
@@ -507,17 +512,24 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
// Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch
// must have the same number of slots, and we use just one in the 'seek' branch above. This
// branch will only be executed if the 'seek' branch produces EOF, which can only happen if
- // if the seek did not find the record id specified in $_resumeAfter.
+ // the seek did not find the resume record of a tailable cursor or the record id specified
+ // in $_resumeAfter.
auto unusedSlot = slotIdGenerator->generate();
- auto failBranch = sbe::makeProjectStage(
- sbe::makeS<sbe::CoScanStage>(csn->nodeId()),
- csn->nodeId(),
- unusedSlot,
- sbe::makeE<sbe::EFail>(
- ErrorCodes::KeyNotFound,
- str::stream() << "Failed to resume collection scan: the recordId from which we are "
- << "attempting to resume no longer exists in the collection: "
- << csn->resumeAfterRecordId));
+ auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> {
+ if (isTailableResumeBranch) {
+ return {ErrorCodes::CappedPositionLost,
+ "CollectionScan died due to failure to restore tailable cursor position."};
+ }
+ return {
+ ErrorCodes::ErrorCodes::KeyNotFound,
+ str::stream() << "Failed to resume collection scan the recordId from which we are "
+ "attempting to resume no longer exists in the collection: "
+ << csn->resumeAfterRecordId};
+ }();
+ auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()),
+ csn->nodeId(),
+ unusedSlot,
+ sbe::makeE<sbe::EFail>(errorCode, errorMessage));
// Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will
// ever produce a single call to getNext() due to a 'limit 1' sitting on top of it.
@@ -528,12 +540,18 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
sbe::makeSV(*seekRecordIdSlot),
csn->nodeId());
- // Construct the final loop join. Note that we also inject a 'skip 1' stage on top of the
- // inner branch, as we need to start _after_ the resume RecordId, and a 'limit 1' stage on
- // top of the outer branch, as it should produce just a single seek recordId.
+ // Construct the final loop join. Note that for the resume branch of a tailable cursor case
+ // we use the 'seek' stage as an inner branch, since we need to produce all records starting
+ // from the supplied position. For a resume token case we also inject a 'skip 1' stage on
+ // top of the inner branch, as we need to start _after_ the resume RecordId. In both cases
+ // we inject a 'limit 1' stage on top of the outer branch, as it should produce just a
+ // single seek recordId.
+ auto innerStage = isTailableResumeBranch
+ ? std::move(stage)
+ : sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1, csn->nodeId());
stage = sbe::makeS<sbe::LoopJoinStage>(
sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()),
- sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1, csn->nodeId()),
+ std::move(innerStage),
sbe::makeSV(),
sbe::makeSV(*seekRecordIdSlot),
nullptr,