diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 284 |
1 files changed, 166 insertions, 118 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index 63c1a6aa697..bd045c33801 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -108,19 +108,139 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const CollectionPtr& collection, return {}; } +// If the scan should be started after the provided resume RecordId, we will construct a nested-loop +// join sub-tree to project out the resume RecordId and feed it into the inner side (scan). We will +// also construct a union sub-tree as an outer side of the loop join to implement the check that the +// record we're trying to reposition the scan exists. +// +// nlj [] [seekRecordIdSlot] +// left +// limit 1 +// union [seekRecordIdSlot] +// [seekSlot] +// nlj +// left +// project seekSlot = <seekRecordIdExpression> +// limit 1 +// coscan +// right +// seek seekSlot ... +// [unusedSlot] +// project unusedSlot = efail(KeyNotFound) +// limit 1 +// coscan +// right +// skip 1 +// <inputStage> +std::unique_ptr<sbe::PlanStage> buildResumeFromRecordIdSubtree( + StageBuilderState& state, + const CollectionPtr& collection, + const CollectionScanNode* csn, + std::unique_ptr<sbe::PlanStage> inputStage, + sbe::value::SlotId seekRecordIdSlot, + std::unique_ptr<sbe::EExpression> seekRecordIdExpression, + PlanYieldPolicy* yieldPolicy, + bool isTailableResumeBranch, + bool resumeAfterRecordId, + sbe::LockAcquisitionCallback lockAcquisitionCallback) { + invariant(seekRecordIdExpression); + + const auto forward = csn->direction == CollectionScanParams::FORWARD; + // Project out the RecordId we want to resume from as 'seekSlot'. + auto seekSlot = state.slotId(); + auto projStage = sbe::makeProjectStage( + sbe::makeS<sbe::LimitSkipStage>( + sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), + csn->nodeId(), + seekSlot, + std::move(seekRecordIdExpression)); + + // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, + // the branch will output the 'seekSlot' to start the real scan from, otherwise it will + // produce EOF. + auto seekBranch = + sbe::makeS<sbe::LoopJoinStage>(std::move(projStage), + sbe::makeS<sbe::ScanStage>(collection->uuid(), + boost::none /* recordSlot */, + boost::none /* recordIdSlot*/, + boost::none /* snapshotIdSlot */, + boost::none /* indexIdSlot */, + boost::none /* indexKeySlot */, + boost::none /* keyPatternSlot */, + boost::none /* oplogTsSlot */, + std::vector<std::string>{}, + sbe::makeSV(), + seekSlot, + forward, + yieldPolicy, + csn->nodeId(), + lockAcquisitionCallback), + sbe::makeSV(seekSlot), + sbe::makeSV(seekSlot), + nullptr, + csn->nodeId()); + + // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch must + // have the same number of slots, and we use just one in the 'seek' branch above. This branch + // will only be executed if the 'seek' branch produces EOF, which can only happen if the seek + // did not find the resume record of a tailable cursor or the record id specified in + // $_resumeAfter. + auto unusedSlot = state.slotId(); + auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> { + if (isTailableResumeBranch) { + return {ErrorCodes::CappedPositionLost, + "CollectionScan died due to failure to restore tailable cursor position."}; + } + return {ErrorCodes::ErrorCodes::KeyNotFound, + str::stream() << "Failed to resume collection scan the recordId from which we are " + "attempting to resume no longer exists in the collection: " + << csn->resumeAfterRecordId}; + }(); + auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()), + csn->nodeId(), + unusedSlot, + sbe::makeE<sbe::EFail>(errorCode, errorMessage)); + + // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will ever + // produce a single call to getNext() due to a 'limit 1' sitting on top of it. + auto unionStage = sbe::makeS<sbe::UnionStage>( + makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), std::move(failBranch)), + std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, + sbe::makeSV(seekRecordIdSlot), + csn->nodeId()); + + // Construct the final loop join. Note that for the resume branch of a tailable cursor case we + // use the 'seek' stage as an inner branch, since we need to produce all records starting from + // the supplied position. For a resume token case we also inject a 'skip 1' stage on top of the + // inner branch, as we need to start _after_ the resume RecordId. In both cases we inject a + // 'limit 1' stage on top of the outer branch, as it should produce just a single seek recordId. + auto innerStage = isTailableResumeBranch || !resumeAfterRecordId + ? std::move(inputStage) + : sbe::makeS<sbe::LimitSkipStage>(std::move(inputStage), boost::none, 1, csn->nodeId()); + return sbe::makeS<sbe::LoopJoinStage>( + sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()), + std::move(innerStage), + sbe::makeSV(), + sbe::makeSV(seekRecordIdSlot), + nullptr, + csn->nodeId()); +} + /** * Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan - * when there is a predicted on the 'ts' field of the oplog collection. + * when any of the following scenarios apply: * - * 1. If a lower bound on 'ts' is present, the collection scan will seek directly to the RecordId - * of an oplog entry as close to this lower bound as possible without going higher. - * 1.1 If the query is just a lower bound on 'ts' on a forward scan, every document in the - * collection after the first matching one must also match. To avoid wasting time - * running the filter on every document to be returned, we will stop applying the filter - * once it finds the first match. - * 2. If an upper bound on 'ts' is present, the collection scan will stop and return EOF the first - * time it fetches a document that does not pass the filter and has 'ts' greater than the upper - * bound. + * 1. There is a predicted on the 'ts' field of the oplog collection. + * 1.1 If a lower bound on 'ts' is present, the collection scan will seek directly to the + * RecordId of an oplog entry as close to this lower bound as possible without going higher. + * 1.2 If the query is *only* a lower bound on 'ts' on a forward scan, every document in the + * collection after the first matching one must also match. To avoid wasting time running the + * filter on every document to be returned, we will stop applying the filter once it finds + * the first match. + * 1.3 If an upper bound on 'ts' is present, the collection scan will stop and return EOF the + * first time it fetches a document that does not pass the filter and has 'ts' greater than + * the upper bound. + * 2. The user request specified a $_resumeAfter recordId from which to begin the scan. */ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplogScan( StageBuilderState& state, @@ -130,9 +250,11 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo bool isTailableResumeBranch, sbe::LockAcquisitionCallback lockAcquisitionCallback) { invariant(collection->ns().isOplog()); - // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId and can - // only be done for a forward scan. - invariant(!csn->resumeAfterRecordId); + // We can apply oplog scan optimizations only when at least one of the following was specified. + invariant(csn->resumeAfterRecordId || csn->minRecord || csn->maxRecord); + // The minRecord and maxRecord optimizations are not compatible with resumeAfterRecordId. + invariant(!(csn->resumeAfterRecordId && (csn->minRecord || csn->maxRecord))); + // Oplog scan optimizations can only be done for a forward scan. invariant(csn->direction == CollectionScanParams::FORWARD); auto resultSlot = state.slotId(); @@ -142,17 +264,22 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // Otherwise, if we're building a collection scan for a resume branch of a special union // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly // to access the recordId to resume the scan from. - auto [seekRecordId, seekRecordIdSlot] = - [&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> { + auto [seekRecordIdSlot, seekRecordIdExpression] = + [&]() -> std::pair<boost::optional<sbe::value::SlotId>, std::unique_ptr<sbe::EExpression>> { if (isTailableResumeBranch) { auto resumeRecordIdSlot = state.env->getSlot("resumeRecordId"_sd); - return {{}, resumeRecordIdSlot}; + return {resumeRecordIdSlot, makeVariable(resumeRecordIdSlot)}; + } else if (csn->resumeAfterRecordId) { + return { + state.slotId(), + makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->getLong())}; } else if (csn->minRecord) { auto cursor = collection->getRecordStore()->getCursor(state.opCtx); auto startRec = cursor->seekNear(*csn->minRecord); if (startRec) { LOGV2_DEBUG(205841, 3, "Using direct oplog seek"); - return {startRec->id, state.slotId()}; + return {state.slotId(), + makeConstant(sbe::value::TypeTags::RecordId, startRec->id.getLong())}; } } return {}; @@ -185,22 +312,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo std::move(callbacks)); // Start the scan from the seekRecordId. - if (seekRecordId) { - invariant(seekRecordIdSlot); - - // Project the start RecordId as a seekRecordIdSlot and feed it to the inner side (scan). - stage = sbe::makeS<sbe::LoopJoinStage>( - sbe::makeProjectStage( - sbe::makeS<sbe::LimitSkipStage>( - sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), - csn->nodeId(), - *seekRecordIdSlot, - makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->getLong())), - std::move(stage), - sbe::makeSV(), - sbe::makeSV(*seekRecordIdSlot), - nullptr, - csn->nodeId()); + if (seekRecordIdSlot) { + stage = buildResumeFromRecordIdSubtree(state, + collection, + csn, + std::move(stage), + *seekRecordIdSlot, + std::move(seekRecordIdExpression), + yieldPolicy, + isTailableResumeBranch, + csn->resumeAfterRecordId.has_value(), + lockAcquisitionCallback); } // Create a filter which checks the first document to ensure either that its 'ts' is less than @@ -363,6 +485,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo // inner branch, and the execution will continue from this point further on, without // applying the filter. if (csn->stopApplyingFilterAfterFirstMatch) { + invariant(!csn->maxRecord); invariant(csn->direction == CollectionScanParams::FORWARD); seekRecordIdSlot = recordIdSlot; @@ -464,92 +587,17 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc csn->nodeId(), std::move(callbacks)); - // Check if the scan should be started after the provided resume RecordId and construct a nested - // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to - // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop - // join to implement the check that the record we're trying to reposition the scan exists. if (seekRecordIdSlot) { - // Project out the RecordId we want to resume from as 'seekSlot'. - auto seekSlot = state.slotId(); - auto projStage = sbe::makeProjectStage( - sbe::makeS<sbe::LimitSkipStage>( - sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), - csn->nodeId(), - seekSlot, - std::move(seekRecordIdExpression)); - - // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, - // the branch will output the 'seekSlot' to start the real scan from, otherwise it will - // produce EOF. - auto seekBranch = sbe::makeS<sbe::LoopJoinStage>( - std::move(projStage), - sbe::makeS<sbe::ScanStage>(collection->uuid(), - boost::none /* recordSlot */, - boost::none /* recordIdSlot*/, - boost::none /* snapshotIdSlot */, - boost::none /* indexIdSlot */, - boost::none /* indexKeySlot */, - boost::none /* keyPatternSlot */, - boost::none /* oplogTsSlot */, - std::vector<std::string>{}, - sbe::makeSV(), - seekSlot, - forward, - yieldPolicy, - csn->nodeId(), - lockAcquisitionCallback), - sbe::makeSV(seekSlot), - sbe::makeSV(seekSlot), - nullptr, - csn->nodeId()); - - // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch - // must have the same number of slots, and we use just one in the 'seek' branch above. This - // branch will only be executed if the 'seek' branch produces EOF, which can only happen if - // the seek did not find the resume record of a tailable cursor or the record id specified - // in $_resumeAfter. - auto unusedSlot = state.slotId(); - auto [errorCode, errorMessage] = [&]() -> std::pair<ErrorCodes::Error, std::string> { - if (isTailableResumeBranch) { - return {ErrorCodes::CappedPositionLost, - "CollectionScan died due to failure to restore tailable cursor position."}; - } - return { - ErrorCodes::ErrorCodes::KeyNotFound, - str::stream() << "Failed to resume collection scan the recordId from which we are " - "attempting to resume no longer exists in the collection: " - << csn->resumeAfterRecordId}; - }(); - auto failBranch = sbe::makeProjectStage(sbe::makeS<sbe::CoScanStage>(csn->nodeId()), - csn->nodeId(), - unusedSlot, - sbe::makeE<sbe::EFail>(errorCode, errorMessage)); - - // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will - // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it. - auto unionStage = sbe::makeS<sbe::UnionStage>( - makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), - std::move(failBranch)), - std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, - sbe::makeSV(*seekRecordIdSlot), - csn->nodeId()); - - // Construct the final loop join. Note that for the resume branch of a tailable cursor case - // we use the 'seek' stage as an inner branch, since we need to produce all records starting - // from the supplied position. For a resume token case we also inject a 'skip 1' stage on - // top of the inner branch, as we need to start _after_ the resume RecordId. In both cases - // we inject a 'limit 1' stage on top of the outer branch, as it should produce just a - // single seek recordId. - auto innerStage = isTailableResumeBranch - ? std::move(stage) - : sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1, csn->nodeId()); - stage = sbe::makeS<sbe::LoopJoinStage>( - sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none, csn->nodeId()), - std::move(innerStage), - sbe::makeSV(), - sbe::makeSV(*seekRecordIdSlot), - nullptr, - csn->nodeId()); + stage = buildResumeFromRecordIdSubtree(state, + collection, + csn, + std::move(stage), + *seekRecordIdSlot, + std::move(seekRecordIdExpression), + yieldPolicy, + isTailableResumeBranch, + true, /* resumeAfterRecordId */ + lockAcquisitionCallback); } if (csn->filter) { |