diff options
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 111 |
1 files changed, 71 insertions, 40 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index fbfcef2cca6..85fe75f5508 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -81,6 +81,24 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const Collection* collection, } /** + * If 'shouldTrackLatestOplogTimestamp' returns a vector holding the name of the oplog 'ts' field + * along with another vector holding a SlotId to map this field to, as well as the standalone value + * of the same SlotId (the latter is returned purely for convenience purposes). + */ +std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>> +makeOplogTimestampSlotsIfNeeded(const Collection* collection, + sbe::value::SlotIdGenerator* slotIdGenerator, + bool shouldTrackLatestOplogTimestamp) { + if (shouldTrackLatestOplogTimestamp) { + invariant(collection->ns().isOplog()); + + auto tsSlot = slotIdGenerator->generate(); + return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; + } + return {}; +}; + +/** * Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan * when there is a predicted on the 'ts' field of the oplog collection. * @@ -103,6 +121,8 @@ generateOptimizedOplogScan(OperationContext* opCtx, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker) { invariant(collection->ns().isOplog()); // The minTs and maxTs optimizations are not compatible with resumeAfterRecordId and can only @@ -115,9 +135,15 @@ generateOptimizedOplogScan(OperationContext* opCtx, // See if the RecordStore supports the oplogStartHack. If so, the scan will start from the // RecordId stored in seekRecordId. + // Otherwise, if we're building a collection scan for a resume branch of a special union + // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly + // to access the recordId to resume the scan from. auto [seekRecordId, seekRecordIdSlot] = [&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> { - if (csn->minTs) { + if (isTailableResumeBranch) { + auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd); + return {{}, resumeRecordIdSlot}; + } else if (csn->minTs) { auto goal = oploghack::keyForOptime(*csn->minTs); if (goal.isOK()) { auto startLoc = @@ -134,18 +160,10 @@ generateOptimizedOplogScan(OperationContext* opCtx, // Check if we need to project out an oplog 'ts' field as part of the collection scan. We will // need it either when 'maxTs' bound has been provided, so that we can apply an EOF filter, of // if we need to track the latest oplog timestamp. - auto [fields, slots, tsSlot] = [&]() -> std::tuple<std::vector<std::string>, - sbe::value::SlotVector, - boost::optional<sbe::value::SlotId>> { - // Don't project the 'ts' if stopApplyingFilterAfterFirstMatch is 'true'. We will have - // another scan stage where it will be done. - if (!csn->stopApplyingFilterAfterFirstMatch && - (csn->maxTs || csn->shouldTrackLatestOplogTimestamp)) { - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; - } - return {}; - }(); + const auto shouldTrackLatestOplogTimestamp = !csn->stopApplyingFilterAfterFirstMatch && + (csn->maxTs || csn->shouldTrackLatestOplogTimestamp); + auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, shouldTrackLatestOplogTimestamp); NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()}; auto stage = sbe::makeS<sbe::ScanStage>(nss, @@ -217,16 +235,11 @@ generateOptimizedOplogScan(OperationContext* opCtx, // inner branch, and the execution will continue from this point further on, without // applying the filter. if (csn->stopApplyingFilterAfterFirstMatch) { - std::tie(fields, slots, tsSlot) = - [&]() -> std::tuple<std::vector<std::string>, - sbe::value::SlotVector, - boost::optional<sbe::value::SlotId>> { - if (csn->shouldTrackLatestOplogTimestamp) { - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; - } - return {}; - }(); + invariant(csn->minTs || csn->maxTs); + invariant(csn->direction == CollectionScanParams::FORWARD); + + std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); seekRecordIdSlot = recordIdSlot; resultSlot = slotIdGenerator->generate(); @@ -268,27 +281,32 @@ generateGenericCollScan(const Collection* collection, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker) { const auto forward = csn->direction == CollectionScanParams::FORWARD; + invariant(!csn->shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + invariant(!csn->resumeAfterRecordId || forward); + invariant(!csn->resumeAfterRecordId || !csn->tailable); + auto resultSlot = slotIdGenerator->generate(); auto recordIdSlot = slotIdGenerator->generate(); - auto seekRecordIdSlot = boost::make_optional(static_cast<bool>(csn->resumeAfterRecordId), - slotIdGenerator->generate()); - - // See if we need to project out an oplog latest timestamp. - auto [fields, slots, tsSlot] = [&]() -> std::tuple<std::vector<std::string>, - sbe::value::SlotVector, - boost::optional<sbe::value::SlotId>> { - if (csn->shouldTrackLatestOplogTimestamp) { - invariant(collection->ns().isOplog()); - - auto tsSlot = slotIdGenerator->generate(); - return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot}; + auto seekRecordIdSlot = [&]() -> boost::optional<sbe::value::SlotId> { + if (csn->resumeAfterRecordId) { + return slotIdGenerator->generate(); + } else if (isTailableResumeBranch) { + auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd); + invariant(resumeRecordIdSlot); + return resumeRecordIdSlot; } return {}; }(); + // See if we need to project out an oplog latest timestamp. + auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); + NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()}; auto stage = sbe::makeS<sbe::ScanStage>(nss, resultSlot, @@ -310,7 +328,7 @@ generateGenericCollScan(const Collection* collection, // // TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on // seekRecordIdSlot. - if (seekRecordIdSlot) { + if (seekRecordIdSlot && !isTailableResumeBranch) { stage = sbe::makeS<sbe::LoopJoinStage>( sbe::makeProjectStage( sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none), @@ -345,15 +363,28 @@ generateCollScan(OperationContext* opCtx, const CollectionScanNode* csn, sbe::value::SlotIdGenerator* slotIdGenerator, PlanYieldPolicy* yieldPolicy, + sbe::RuntimeEnvironment* env, + bool isTailableResumeBranch, TrialRunProgressTracker* tracker) { - uassert(4822889, "Tailable collection scans are not supported in SBE", !csn->tailable); auto [resultSlot, recordIdSlot, oplogTsSlot, stage] = [&]() { if (csn->minTs || csn->maxTs) { - return generateOptimizedOplogScan( - opCtx, collection, csn, slotIdGenerator, yieldPolicy, tracker); + return generateOptimizedOplogScan(opCtx, + collection, + csn, + slotIdGenerator, + yieldPolicy, + env, + isTailableResumeBranch, + tracker); } else { - return generateGenericCollScan(collection, csn, slotIdGenerator, yieldPolicy, tracker); + return generateGenericCollScan(collection, + csn, + slotIdGenerator, + yieldPolicy, + env, + isTailableResumeBranch, + tracker); } }(); |