summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2021-01-21 23:06:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-15 16:18:24 +0000
commit394572f340bd06f5306a2e961cd9d4e0d3e96bb1 (patch)
tree85d6b30dcb8e142676192da192f7df70d27f58b7 /src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
parent16aff18cbb6b993ac325a50a59b1898d35cf08c3 (diff)
downloadmongo-394572f340bd06f5306a2e961cd9d4e0d3e96bb1.tar.gz
SERVER-50580 SBE should obey ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG flag
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp158
1 files changed, 128 insertions, 30 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index c277107c21b..252a5be9ccf 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -190,9 +190,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
csn->nodeId(),
*seekRecordIdSlot,
- sbe::makeE<sbe::EConstant>(
- sbe::value::TypeTags::RecordId,
- sbe::value::bitcastFrom<int64_t>(seekRecordId->asLong()))),
+ makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->asLong())),
std::move(stage),
sbe::makeSV(),
sbe::makeSV(*seekRecordIdSlot),
@@ -200,6 +198,112 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
csn->nodeId());
}
+ // Create a filter which checks the first document to ensure either that its 'ts' is less than
+ // or equal to minTs, or that it is a replica set initialization message. If this fails, then we
+ // throw ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch
+ // of a tailable scan; it only needs to be done once, when the initial branch is run.
+ if (csn->assertMinTsHasNotFallenOffOplog && !isTailableResumeBranch) {
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can
+ // be incorrectly requested by the user, but in that case we should already have uasserted.
+ invariant(csn->shouldTrackLatestOplogTimestamp);
+ invariant(csn->minTs);
+
+ // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot'
+ // here so that it does not shadow the 'tsSlot' which we allocated earlier.
+ auto&& [fields, minTsSlots, minTsSlot] = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
+
+ // We should always have allocated a 'minTsSlot', and there should always be a 'tsSlot'
+ // already allocated for the existing scan that we created previously.
+ invariant(minTsSlot);
+ invariant(tsSlot);
+
+ // Our filter will also need to see the 'op' and 'o.msg' fields.
+ auto opTypeSlot = slotIdGenerator->generate();
+ auto oObjSlot = slotIdGenerator->generate();
+ minTsSlots.push_back(opTypeSlot);
+ minTsSlots.push_back(oObjSlot);
+ fields.push_back("op");
+ fields.push_back("o");
+
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't
+ // matter if its timestamp is later than the specified minTs; no events earlier than the
+ // minTs can have fallen off this oplog. Otherwise, we must verify that the timestamp of the
+ // first observed oplog entry is earlier than or equal to the minTs time.
+ //
+ // To achieve this, we build a two-branch union subtree. The left branch is a scan with a
+ // filter that checks the first entry in the oplog for the above criteria, throws via EFail
+ // if they are not met, and EOFs otherwise. The right branch of the union plan is the tree
+ // that we originally built above.
+ //
+ // union [s9, s10, s11] [
+ // [s6, s7, s8] efilter {if (ts <= minTs || op == "n" && isObject (o) &&
+ // getField (o, "msg") == "initiating set", false, fail ( 326 ))}
+ // scan [s6 = ts, s7 = op, s8 = o] @oplog,
+ // <stage>
+
+ // Set up the filter stage to be used in the left branch of the union. If the main body of
+ // the expression does not match the input document, it throws OplogQueryMinTsMissing. If
+ // the expression does match, then it returns 'false', which causes the filter (and as a
+ // result, the branch) to EOF immediately. Note that the resultSlot and recordIdSlot
+ // arguments to the ScanStage are boost::none, as we do not need them.
+ auto minTsBranch = sbe::makeS<sbe::FilterStage<false, true>>(
+ sbe::makeS<sbe::ScanStage>(nss,
+ boost::none,
+ boost::none,
+ std::move(fields),
+ minTsSlots, /* don't move this */
+ boost::none,
+ true /* forward */,
+ yieldPolicy,
+ csn->nodeId(),
+ lockAcquisitionCallback),
+ sbe::makeE<sbe::EIf>(
+ makeBinaryOp(
+ sbe::EPrimBinary::logicOr,
+ makeBinaryOp(
+ sbe::EPrimBinary::lessEq,
+ makeVariable(*minTsSlot),
+ makeConstant(sbe::value::TypeTags::Timestamp, csn->minTs->asULL())),
+ makeBinaryOp(
+ sbe::EPrimBinary::logicAnd,
+ makeBinaryOp(
+ sbe::EPrimBinary::eq, makeVariable(opTypeSlot), makeConstant("n")),
+ makeBinaryOp(sbe::EPrimBinary::logicAnd,
+ makeFunction("isObject", makeVariable(oObjSlot)),
+ makeBinaryOp(sbe::EPrimBinary::eq,
+ makeFunction("getField",
+ makeVariable(oObjSlot),
+ makeConstant("msg")),
+ makeConstant(repl::kInitiatingSetMsg))))),
+ makeConstant(sbe::value::TypeTags::Boolean, false),
+ sbe::makeE<sbe::EFail>(ErrorCodes::OplogQueryMinTsMissing,
+ "Specified minTs has already fallen off the oplog")),
+ csn->nodeId());
+
+ // All branches of the UnionStage must have the same number of input and output slots, and
+ // we want to remap all slots from the basic scan we constructed earlier through the union
+ // stage to the output. We're lucky that the real scan happens to have the same number of
+ // slots (resultSlot, recordSlot, tsSlot) as the minTs check branch (minTsSlot, opTypeSlot,
+ // oObjSlot), so we don't have to compensate with any unused slots. Note that the minTsSlots
+ // will never be mapped to output in practice, since the minTs branch either throws or EOFs.
+ //
+ // We also need to update the local variables for each slot to their remapped values, so
+ // subsequent subtrees constructed by this function refer to the correct post-union slots.
+ auto realSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot);
+ resultSlot = slotIdGenerator->generate();
+ recordIdSlot = slotIdGenerator->generate();
+ tsSlot = slotIdGenerator->generate();
+ auto outputSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot);
+
+ // Create the union stage. The left branch, which runs first, is our resumability check.
+ stage = sbe::makeS<sbe::UnionStage>(
+ makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(minTsBranch), std::move(stage)),
+ makeVector<sbe::value::SlotVector>(std::move(minTsSlots), std::move(realSlots)),
+ std::move(outputSlots),
+ csn->nodeId());
+ }
+
// Add an EOF filter to stop the scan after we fetch the first document that has 'ts' greater
// than the upper bound.
if (csn->maxTs) {
@@ -210,10 +314,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
stage = sbe::makeS<sbe::FilterStage<false, true>>(
std::move(stage),
makeBinaryOp(sbe::EPrimBinary::lessEq,
- sbe::makeE<sbe::EVariable>(*tsSlot),
- sbe::makeE<sbe::EConstant>(
- sbe::value::TypeTags::Timestamp,
- sbe::value::bitcastFrom<uint64_t>((*csn->maxTs).asULL()))),
+ makeVariable(*tsSlot),
+ makeConstant(sbe::value::TypeTags::Timestamp, csn->maxTs->asULL())),
csn->nodeId());
}
@@ -226,16 +328,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
relevantSlots.push_back(*tsSlot);
}
- auto [_, filterStage] = generateFilter(opCtx,
- csn->filter.get(),
- std::move(stage),
- slotIdGenerator,
- frameIdGenerator,
- resultSlot,
- env,
- std::move(relevantSlots),
- csn->nodeId());
- stage = std::move(filterStage);
+ std::tie(std::ignore, stage) = generateFilter(opCtx,
+ csn->filter.get(),
+ std::move(stage),
+ slotIdGenerator,
+ frameIdGenerator,
+ resultSlot,
+ env,
+ std::move(relevantSlots),
+ csn->nodeId());
// We may be requested to stop applying the filter after the first match. This can happen
// if the query is just a lower bound on 'ts' on a forward scan. In this case every document
@@ -366,9 +467,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
csn->nodeId(),
seekSlot,
- sbe::makeE<sbe::EConstant>(
- sbe::value::TypeTags::RecordId,
- sbe::value::bitcastFrom<int64_t>(csn->resumeAfterRecordId->asLong())));
+ makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->asLong()));
// Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
// the branch will output the 'seekSlot' to start the real scan from, otherwise it will
@@ -437,16 +536,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
relevantSlots.push_back(*tsSlot);
}
- auto [_, filterStage] = generateFilter(opCtx,
- csn->filter.get(),
- std::move(stage),
- slotIdGenerator,
- frameIdGenerator,
- resultSlot,
- env,
- std::move(relevantSlots),
- csn->nodeId());
- stage = std::move(filterStage);
+ std::tie(std::ignore, stage) = generateFilter(opCtx,
+ csn->filter.get(),
+ std::move(stage),
+ slotIdGenerator,
+ frameIdGenerator,
+ resultSlot,
+ env,
+ std::move(relevantSlots),
+ csn->nodeId());
}
PlanStageSlots outputs;