summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp284
1 files changed, 166 insertions, 118 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index 63c1a6aa697..bd045c33801 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -108,19 +108,139 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const CollectionPtr& collection,
return {};
}
+// If the scan should be started after the provided resume RecordId, we will construct a nested-loop
+// join sub-tree to project out the resume RecordId and feed it into the inner side (scan). We will
+// also construct a union sub-tree as an outer side of the loop join to implement the check that the
+// record we're trying to reposition the scan exists.
+//
+// nlj [] [seekRecordIdSlot]
+// left
+// limit 1
+// union [seekRecordIdSlot]
+// [seekSlot]
+// nlj
+// left
+// project seekSlot = <seekRecordIdExpression>
+// limit 1
+// coscan
+// right
+// seek seekSlot ...
+// [unusedSlot]
+// project unusedSlot = efail(KeyNotFound)
+// limit 1
+// coscan
+// right
+// skip 1
+// <inputStage>
+std::unique_ptr<sbe::PlanStage> buildResumeFromRecordIdSubtree(
+ StageBuilderState& state,
+ const CollectionPtr& collection,
+ const CollectionScanNode* csn,
+ std::unique_ptr<sbe::PlanStage> inputStage,
+ sbe::value::SlotId seekRecordIdSlot,
+ std::unique_ptr<sbe::EExpression> seekRecordIdExpression,
+ PlanYieldPolicy* yieldPolicy,
+ bool isTailableResumeBranch,
+ bool resumeAfterRecordId,
+ sbe::LockAcquisitionCallback lockAcquisitionCallback) {
+ invariant(seekRecordIdExpression);
+
+ const auto forward = csn->direction == CollectionScanParams::FORWARD;
+ // Project out the RecordId we want to resume from as 'seekSlot'.
+ auto seekSlot = state.slotId();
+ auto projStage = sbe::makeProjectStage(
+ sbe::makeS<sbe::LimitSkipStage>(
+ sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
+ csn->nodeId(),
+ seekSlot,
+ std::move(seekRecordIdExpression));
+
+ // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
+ // the branch will output the 'seekSlot' to start the real scan from, otherwise it will
+ // produce EOF.
+ auto seekBranch =
+ sbe::makeS<sbe::LoopJoinStage>(std::move(projStage),
+ sbe::makeS<sbe::ScanStage>(collection->uuid(),
+ boost::none /* recordSlot */,
+ boost::none /* recordIdSlot*/,
+ boost::none /* snapshotIdSlot */,
+ boost::none /* indexIdSlot */,
+ boost::none /* indexKeySlot */,
+ boost::none /* keyPatternSlot */,
+ boost::none /* oplogTsSlot */,
+ std::vector<std::string>{},
+ sbe::makeSV(),
+ seekSlot,
+ forward,
+ yieldPolicy,
+ csn->nodeId(),
+ lockAcquisitionCallback),
+ sbe::makeSV(seekSlot),
+ sbe::makeSV(seekSlot),
+ nullptr,
+ csn->nodeId());
+
+ // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch must
+ // have the same number of slots, and we use just one in the 'seek' branch above. This branch
+ // will only be executed if the 'seek' branch produces EOF, which can only happen if the seek
+ // did not find the resume record of a tailable cursor or the record id specified in
+ // $_resumeAfter.
+ auto unusedSlot = state.slotId();
+ auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> {
+ if (isTailableResumeBranch) {
+ return {ErrorCodes::CappedPositionLost,
+ "CollectionScan died due to failure to restore tailable cursor position."};
+ }
+ return {ErrorCodes::ErrorCodes::KeyNotFound,
+ str::stream() << "Failed to resume collection scan the recordId from which we are "
+ "attempting to resume no longer exists in the collection: "
+ << csn->resumeAfterRecordId};
+ }();
+ auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()),
+ csn->nodeId(),
+ unusedSlot,
+ sbe::makeE<sbe::EFail>(errorCode, errorMessage));
+
+ // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will ever
+ // produce a single call to getNext() due to a 'limit 1' sitting on top of it.
+ auto unionStage = sbe::makeS<sbe::UnionStage>(
+ makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), std::move(failBranch)),
+ std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)},
+ sbe::makeSV(seekRecordIdSlot),
+ csn->nodeId());
+
+ // Construct the final loop join. Note that for the resume branch of a tailable cursor case we
+ // use the 'seek' stage as an inner branch, since we need to produce all records starting from
+ // the supplied position. For a resume token case we also inject a 'skip 1' stage on top of the
+ // inner branch, as we need to start _after_ the resume RecordId. In both cases we inject a
+ // 'limit 1' stage on top of the outer branch, as it should produce just a single seek recordId.
+ auto innerStage = isTailableResumeBranch || !resumeAfterRecordId
+ ? std::move(inputStage)
+ : sbe::makeS<sbe::LimitSkipStage>(std::move(inputStage), boost::none, 1, csn->nodeId());
+ return sbe::makeS<sbe::LoopJoinStage>(
+ sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()),
+ std::move(innerStage),
+ sbe::makeSV(),
+ sbe::makeSV(seekRecordIdSlot),
+ nullptr,
+ csn->nodeId());
+}
+
/**
* Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan
- * when there is a predicted on the 'ts' field of the oplog collection.
+ * when any of the following scenarios apply:
*
- * 1. If a lower bound on 'ts' is present, the collection scan will seek directly to the RecordId
- * of an oplog entry as close to this lower bound as possible without going higher.
- * 1.1 If the query is just a lower bound on 'ts' on a forward scan, every document in the
- * collection after the first matching one must also match. To avoid wasting time
- * running the filter on every document to be returned, we will stop applying the filter
- * once it finds the first match.
- * 2. If an upper bound on 'ts' is present, the collection scan will stop and return EOF the first
- * time it fetches a document that does not pass the filter and has 'ts' greater than the upper
- * bound.
+ * 1. There is a predicted on the 'ts' field of the oplog collection.
+ * 1.1 If a lower bound on 'ts' is present, the collection scan will seek directly to the
+ * RecordId of an oplog entry as close to this lower bound as possible without going higher.
+ * 1.2 If the query is *only* a lower bound on 'ts' on a forward scan, every document in the
+ * collection after the first matching one must also match. To avoid wasting time running the
+ * filter on every document to be returned, we will stop applying the filter once it finds
+ * the first match.
+ * 1.3 If an upper bound on 'ts' is present, the collection scan will stop and return EOF the
+ * first time it fetches a document that does not pass the filter and has 'ts' greater than
+ * the upper bound.
+ * 2. The user request specified a $_resumeAfter recordId from which to begin the scan.
*/
std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplogScan(
StageBuilderState& state,
@@ -130,9 +250,11 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
bool isTailableResumeBranch,
sbe::LockAcquisitionCallback lockAcquisitionCallback) {
invariant(collection->ns().isOplog());
- // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId and can
- // only be done for a forward scan.
- invariant(!csn->resumeAfterRecordId);
+ // We can apply oplog scan optimizations only when at least one of the following was specified.
+ invariant(csn->resumeAfterRecordId || csn->minRecord || csn->maxRecord);
+ // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId.
+ invariant(!(csn->resumeAfterRecordId && (csn->minRecord || csn->maxRecord)));
+ // Oplog scan optimizations can only be done for a forward scan.
invariant(csn->direction == CollectionScanParams::FORWARD);
auto resultSlot = state.slotId();
@@ -142,17 +264,22 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// Otherwise, if we're building a collection scan for a resume branch of a special union
// sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly
// to access the recordId to resume the scan from.
- auto [seekRecordId, seekRecordIdSlot] =
- [&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> {
+ auto [seekRecordIdSlot, seekRecordIdExpression] =
+ [&]() -> std::pair<boost::optional<sbe::value::SlotId>, std::unique_ptr<sbe::EExpression>> {
if (isTailableResumeBranch) {
auto resumeRecordIdSlot = state.env->getSlot("resumeRecordId"_sd);
- return {{}, resumeRecordIdSlot};
+ return {resumeRecordIdSlot, makeVariable(resumeRecordIdSlot)};
+ } else if (csn->resumeAfterRecordId) {
+ return {
+ state.slotId(),
+ makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->getLong())};
} else if (csn->minRecord) {
auto cursor = collection->getRecordStore()->getCursor(state.opCtx);
auto startRec = cursor->seekNear(*csn->minRecord);
if (startRec) {
LOGV2_DEBUG(205841, 3, "Using direct oplog seek");
- return {startRec->id, state.slotId()};
+ return {state.slotId(),
+ makeConstant(sbe::value::TypeTags::RecordId, startRec->id.getLong())};
}
}
return {};
@@ -185,22 +312,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
std::move(callbacks));
// Start the scan from the seekRecordId.
- if (seekRecordId) {
- invariant(seekRecordIdSlot);
-
- // Project the start RecordId as a seekRecordIdSlot and feed it to the inner side (scan).
- stage = sbe::makeS<sbe::LoopJoinStage>(
- sbe::makeProjectStage(
- sbe::makeS<sbe::LimitSkipStage>(
- sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
- csn->nodeId(),
- *seekRecordIdSlot,
- makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->getLong())),
- std::move(stage),
- sbe::makeSV(),
- sbe::makeSV(*seekRecordIdSlot),
- nullptr,
- csn->nodeId());
+ if (seekRecordIdSlot) {
+ stage = buildResumeFromRecordIdSubtree(state,
+ collection,
+ csn,
+ std::move(stage),
+ *seekRecordIdSlot,
+ std::move(seekRecordIdExpression),
+ yieldPolicy,
+ isTailableResumeBranch,
+ csn->resumeAfterRecordId.has_value(),
+ lockAcquisitionCallback);
}
// Create a filter which checks the first document to ensure either that its 'ts' is less than
@@ -363,6 +485,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
// inner branch, and the execution will continue from this point further on, without
// applying the filter.
if (csn->stopApplyingFilterAfterFirstMatch) {
+ invariant(!csn->maxRecord);
invariant(csn->direction == CollectionScanParams::FORWARD);
seekRecordIdSlot = recordIdSlot;
@@ -464,92 +587,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
csn->nodeId(),
std::move(callbacks));
- // Check if the scan should be started after the provided resume RecordId and construct a nested
- // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to
- // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop
- // join to implement the check that the record we're trying to reposition the scan exists.
if (seekRecordIdSlot) {
- // Project out the RecordId we want to resume from as 'seekSlot'.
- auto seekSlot = state.slotId();
- auto projStage = sbe::makeProjectStage(
- sbe::makeS<sbe::LimitSkipStage>(
- sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
- csn->nodeId(),
- seekSlot,
- std::move(seekRecordIdExpression));
-
- // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
- // the branch will output the 'seekSlot' to start the real scan from, otherwise it will
- // produce EOF.
- auto seekBranch = sbe::makeS<sbe::LoopJoinStage>(
- std::move(projStage),
- sbe::makeS<sbe::ScanStage>(collection->uuid(),
- boost::none /* recordSlot */,
- boost::none /* recordIdSlot*/,
- boost::none /* snapshotIdSlot */,
- boost::none /* indexIdSlot */,
- boost::none /* indexKeySlot */,
- boost::none /* keyPatternSlot */,
- boost::none /* oplogTsSlot */,
- std::vector<std::string>{},
- sbe::makeSV(),
- seekSlot,
- forward,
- yieldPolicy,
- csn->nodeId(),
- lockAcquisitionCallback),
- sbe::makeSV(seekSlot),
- sbe::makeSV(seekSlot),
- nullptr,
- csn->nodeId());
-
- // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch
- // must have the same number of slots, and we use just one in the 'seek' branch above. This
- // branch will only be executed if the 'seek' branch produces EOF, which can only happen if
- // the seek did not find the resume record of a tailable cursor or the record id specified
- // in $_resumeAfter.
- auto unusedSlot = state.slotId();
- auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> {
- if (isTailableResumeBranch) {
- return {ErrorCodes::CappedPositionLost,
- "CollectionScan died due to failure to restore tailable cursor position."};
- }
- return {
- ErrorCodes::ErrorCodes::KeyNotFound,
- str::stream() << "Failed to resume collection scan the recordId from which we are "
- "attempting to resume no longer exists in the collection: "
- << csn->resumeAfterRecordId};
- }();
- auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()),
- csn->nodeId(),
- unusedSlot,
- sbe::makeE<sbe::EFail>(errorCode, errorMessage));
-
- // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will
- // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it.
- auto unionStage = sbe::makeS<sbe::UnionStage>(
- makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch),
- std::move(failBranch)),
- std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)},
- sbe::makeSV(*seekRecordIdSlot),
- csn->nodeId());
-
- // Construct the final loop join. Note that for the resume branch of a tailable cursor case
- // we use the 'seek' stage as an inner branch, since we need to produce all records starting
- // from the supplied position. For a resume token case we also inject a 'skip 1' stage on
- // top of the inner branch, as we need to start _after_ the resume RecordId. In both cases
- // we inject a 'limit 1' stage on top of the outer branch, as it should produce just a
- // single seek recordId.
- auto innerStage = isTailableResumeBranch
- ? std::move(stage)
- : sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1, csn->nodeId());
- stage = sbe::makeS<sbe::LoopJoinStage>(
- sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()),
- std::move(innerStage),
- sbe::makeSV(),
- sbe::makeSV(*seekRecordIdSlot),
- nullptr,
- csn->nodeId());
+ stage = buildResumeFromRecordIdSubtree(state,
+ collection,
+ csn,
+ std::move(stage),
+ *seekRecordIdSlot,
+ std::move(seekRecordIdExpression),
+ yieldPolicy,
+ isTailableResumeBranch,
+ true, /* resumeAfterRecordId */
+ lockAcquisitionCallback);
}
if (csn->filter) {