summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp111
1 files changed, 71 insertions, 40 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index fbfcef2cca6..85fe75f5508 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -81,6 +81,24 @@ sbe::ScanOpenCallback makeOpenCallbackIfNeeded(const Collection* collection,
}
/**
+ * If 'shouldTrackLatestOplogTimestamp' returns a vector holding the name of the oplog 'ts' field
+ * along with another vector holding a SlotId to map this field to, as well as the standalone value
+ * of the same SlotId (the latter is returned purely for convenience purposes).
+ */
+std::tuple<std::vector<std::string>, sbe::value::SlotVector, boost::optional<sbe::value::SlotId>>
+makeOplogTimestampSlotsIfNeeded(const Collection* collection,
+ sbe::value::SlotIdGenerator* slotIdGenerator,
+ bool shouldTrackLatestOplogTimestamp) {
+ if (shouldTrackLatestOplogTimestamp) {
+ invariant(collection->ns().isOplog());
+
+ auto tsSlot = slotIdGenerator->generate();
+ return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
+ }
+ return {};
+};
+
+/**
* Creates a collection scan sub-tree optimized for oplog scans. We can built an optimized scan
* when there is a predicted on the 'ts' field of the oplog collection.
*
@@ -103,6 +121,8 @@ generateOptimizedOplogScan(OperationContext* opCtx,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker) {
invariant(collection->ns().isOplog());
// The minTs and maxTs optimizations are not compatible with resumeAfterRecordId and can only
@@ -115,9 +135,15 @@ generateOptimizedOplogScan(OperationContext* opCtx,
// See if the RecordStore supports the oplogStartHack. If so, the scan will start from the
// RecordId stored in seekRecordId.
+ // Otherwise, if we're building a collection scan for a resume branch of a special union
+ // sub-tree implementing a tailable cursor scan, we can use the seekRecordIdSlot directly
+ // to access the recordId to resume the scan from.
auto [seekRecordId, seekRecordIdSlot] =
[&]() -> std::pair<boost::optional<RecordId>, boost::optional<sbe::value::SlotId>> {
- if (csn->minTs) {
+ if (isTailableResumeBranch) {
+ auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd);
+ return {{}, resumeRecordIdSlot};
+ } else if (csn->minTs) {
auto goal = oploghack::keyForOptime(*csn->minTs);
if (goal.isOK()) {
auto startLoc =
@@ -134,18 +160,10 @@ generateOptimizedOplogScan(OperationContext* opCtx,
// Check if we need to project out an oplog 'ts' field as part of the collection scan. We will
// need it either when 'maxTs' bound has been provided, so that we can apply an EOF filter, of
// if we need to track the latest oplog timestamp.
- auto [fields, slots, tsSlot] = [&]() -> std::tuple<std::vector<std::string>,
- sbe::value::SlotVector,
- boost::optional<sbe::value::SlotId>> {
- // Don't project the 'ts' if stopApplyingFilterAfterFirstMatch is 'true'. We will have
- // another scan stage where it will be done.
- if (!csn->stopApplyingFilterAfterFirstMatch &&
- (csn->maxTs || csn->shouldTrackLatestOplogTimestamp)) {
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
- }
- return {};
- }();
+ const auto shouldTrackLatestOplogTimestamp = !csn->stopApplyingFilterAfterFirstMatch &&
+ (csn->maxTs || csn->shouldTrackLatestOplogTimestamp);
+ auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, shouldTrackLatestOplogTimestamp);
NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()};
auto stage = sbe::makeS<sbe::ScanStage>(nss,
@@ -217,16 +235,11 @@ generateOptimizedOplogScan(OperationContext* opCtx,
// inner branch, and the execution will continue from this point further on, without
// applying the filter.
if (csn->stopApplyingFilterAfterFirstMatch) {
- std::tie(fields, slots, tsSlot) =
- [&]() -> std::tuple<std::vector<std::string>,
- sbe::value::SlotVector,
- boost::optional<sbe::value::SlotId>> {
- if (csn->shouldTrackLatestOplogTimestamp) {
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
- }
- return {};
- }();
+ invariant(csn->minTs || csn->maxTs);
+ invariant(csn->direction == CollectionScanParams::FORWARD);
+
+ std::tie(fields, slots, tsSlot) = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
seekRecordIdSlot = recordIdSlot;
resultSlot = slotIdGenerator->generate();
@@ -268,27 +281,32 @@ generateGenericCollScan(const Collection* collection,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker) {
const auto forward = csn->direction == CollectionScanParams::FORWARD;
+ invariant(!csn->shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ invariant(!csn->resumeAfterRecordId || forward);
+ invariant(!csn->resumeAfterRecordId || !csn->tailable);
+
auto resultSlot = slotIdGenerator->generate();
auto recordIdSlot = slotIdGenerator->generate();
- auto seekRecordIdSlot = boost::make_optional(static_cast<bool>(csn->resumeAfterRecordId),
- slotIdGenerator->generate());
-
- // See if we need to project out an oplog latest timestamp.
- auto [fields, slots, tsSlot] = [&]() -> std::tuple<std::vector<std::string>,
- sbe::value::SlotVector,
- boost::optional<sbe::value::SlotId>> {
- if (csn->shouldTrackLatestOplogTimestamp) {
- invariant(collection->ns().isOplog());
-
- auto tsSlot = slotIdGenerator->generate();
- return {{repl::OpTime::kTimestampFieldName}, sbe::makeSV(tsSlot), tsSlot};
+ auto seekRecordIdSlot = [&]() -> boost::optional<sbe::value::SlotId> {
+ if (csn->resumeAfterRecordId) {
+ return slotIdGenerator->generate();
+ } else if (isTailableResumeBranch) {
+ auto resumeRecordIdSlot = env->getSlot("resumeRecordId"_sd);
+ invariant(resumeRecordIdSlot);
+ return resumeRecordIdSlot;
}
return {};
}();
+ // See if we need to project out an oplog latest timestamp.
+ auto&& [fields, slots, tsSlot] = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
+
NamespaceStringOrUUID nss{collection->ns().db().toString(), collection->uuid()};
auto stage = sbe::makeS<sbe::ScanStage>(nss,
resultSlot,
@@ -310,7 +328,7 @@ generateGenericCollScan(const Collection* collection,
//
// TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on
// seekRecordIdSlot.
- if (seekRecordIdSlot) {
+ if (seekRecordIdSlot && !isTailableResumeBranch) {
stage = sbe::makeS<sbe::LoopJoinStage>(
sbe::makeProjectStage(
sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none),
@@ -345,15 +363,28 @@ generateCollScan(OperationContext* opCtx,
const CollectionScanNode* csn,
sbe::value::SlotIdGenerator* slotIdGenerator,
PlanYieldPolicy* yieldPolicy,
+ sbe::RuntimeEnvironment* env,
+ bool isTailableResumeBranch,
TrialRunProgressTracker* tracker) {
- uassert(4822889, "Tailable collection scans are not supported in SBE", !csn->tailable);
auto [resultSlot, recordIdSlot, oplogTsSlot, stage] = [&]() {
if (csn->minTs || csn->maxTs) {
- return generateOptimizedOplogScan(
- opCtx, collection, csn, slotIdGenerator, yieldPolicy, tracker);
+ return generateOptimizedOplogScan(opCtx,
+ collection,
+ csn,
+ slotIdGenerator,
+ yieldPolicy,
+ env,
+ isTailableResumeBranch,
+ tracker);
} else {
- return generateGenericCollScan(collection, csn, slotIdGenerator, yieldPolicy, tracker);
+ return generateGenericCollScan(collection,
+ csn,
+ slotIdGenerator,
+ yieldPolicy,
+ env,
+ isTailableResumeBranch,
+ tracker);
}
}();