diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2021-01-21 23:06:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-15 16:18:24 +0000 |
commit | 394572f340bd06f5306a2e961cd9d4e0d3e96bb1 (patch) | |
tree | 85d6b30dcb8e142676192da192f7df70d27f58b7 /src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | |
parent | 16aff18cbb6b993ac325a50a59b1898d35cf08c3 (diff) | |
download | mongo-394572f340bd06f5306a2e961cd9d4e0d3e96bb1.tar.gz |
SERVER-50580 SBE should obey ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG flag
Diffstat (limited to 'src/mongo/db/query/sbe_stage_builder_coll_scan.cpp')
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 158 |
1 files changed, 128 insertions, 30 deletions
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index c277107c21b..252a5be9ccf 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -190,9 +190,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), csn->nodeId(), *seekRecordIdSlot, - sbe::makeE<sbe::EConstant>( - sbe::value::TypeTags::RecordId, - sbe::value::bitcastFrom<int64_t>(seekRecordId->asLong()))), + makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->asLong())), std::move(stage), sbe::makeSV(), sbe::makeSV(*seekRecordIdSlot), @@ -200,6 +198,112 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo csn->nodeId()); } + // Create a filter which checks the first document to ensure either that its 'ts' is less than + // or equal to minTs, or that it is a replica set initialization message. If this fails, then we + // throw ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch + // of a tailable scan; it only needs to be done once, when the initial branch is run. + if (csn->assertMinTsHasNotFallenOffOplog && !isTailableResumeBranch) { + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can + // be incorrectly requested by the user, but in that case we should already have uasserted. + invariant(csn->shouldTrackLatestOplogTimestamp); + invariant(csn->minTs); + + // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot' + // here so that it does not shadow the 'tsSlot' which we allocated earlier. + auto&& [fields, minTsSlots, minTsSlot] = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); + + // We should always have allocated a 'minTsSlot', and there should always be a 'tsSlot' + // already allocated for the existing scan that we created previously. + invariant(minTsSlot); + invariant(tsSlot); + + // Our filter will also need to see the 'op' and 'o.msg' fields. + auto opTypeSlot = slotIdGenerator->generate(); + auto oObjSlot = slotIdGenerator->generate(); + minTsSlots.push_back(opTypeSlot); + minTsSlots.push_back(oObjSlot); + fields.push_back("op"); + fields.push_back("o"); + + // If the first entry we see in the oplog is the replset initialization, then it doesn't + // matter if its timestamp is later than the specified minTs; no events earlier than the + // minTs can have fallen off this oplog. Otherwise, we must verify that the timestamp of the + // first observed oplog entry is earlier than or equal to the minTs time. + // + // To achieve this, we build a two-branch union subtree. The left branch is a scan with a + // filter that checks the first entry in the oplog for the above criteria, throws via EFail + // if they are not met, and EOFs otherwise. The right branch of the union plan is the tree + // that we originally built above. + // + // union [s9, s10, s11] [ + // [s6, s7, s8] efilter {if (ts <= minTs || op == "n" && isObject (o) && + // getField (o, "msg") == "initiating set", false, fail ( 326 ))} + // scan [s6 = ts, s7 = op, s8 = o] @oplog, + // <stage> + + // Set up the filter stage to be used in the left branch of the union. If the main body of + // the expression does not match the input document, it throws OplogQueryMinTsMissing. If + // the expression does match, then it returns 'false', which causes the filter (and as a + // result, the branch) to EOF immediately. Note that the resultSlot and recordIdSlot + // arguments to the ScanStage are boost::none, as we do not need them. + auto minTsBranch = sbe::makeS<sbe::FilterStage<false, true>>( + sbe::makeS<sbe::ScanStage>(nss, + boost::none, + boost::none, + std::move(fields), + minTsSlots, /* don't move this */ + boost::none, + true /* forward */, + yieldPolicy, + csn->nodeId(), + lockAcquisitionCallback), + sbe::makeE<sbe::EIf>( + makeBinaryOp( + sbe::EPrimBinary::logicOr, + makeBinaryOp( + sbe::EPrimBinary::lessEq, + makeVariable(*minTsSlot), + makeConstant(sbe::value::TypeTags::Timestamp, csn->minTs->asULL())), + makeBinaryOp( + sbe::EPrimBinary::logicAnd, + makeBinaryOp( + sbe::EPrimBinary::eq, makeVariable(opTypeSlot), makeConstant("n")), + makeBinaryOp(sbe::EPrimBinary::logicAnd, + makeFunction("isObject", makeVariable(oObjSlot)), + makeBinaryOp(sbe::EPrimBinary::eq, + makeFunction("getField", + makeVariable(oObjSlot), + makeConstant("msg")), + makeConstant(repl::kInitiatingSetMsg))))), + makeConstant(sbe::value::TypeTags::Boolean, false), + sbe::makeE<sbe::EFail>(ErrorCodes::OplogQueryMinTsMissing, + "Specified minTs has already fallen off the oplog")), + csn->nodeId()); + + // All branches of the UnionStage must have the same number of input and output slots, and + // we want to remap all slots from the basic scan we constructed earlier through the union + // stage to the output. We're lucky that the real scan happens to have the same number of + // slots (resultSlot, recordSlot, tsSlot) as the minTs check branch (minTsSlot, opTypeSlot, + // oObjSlot), so we don't have to compensate with any unused slots. Note that the minTsSlots + // will never be mapped to output in practice, since the minTs branch either throws or EOFs. + // + // We also need to update the local variables for each slot to their remapped values, so + // subsequent subtrees constructed by this function refer to the correct post-union slots. + auto realSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot); + resultSlot = slotIdGenerator->generate(); + recordIdSlot = slotIdGenerator->generate(); + tsSlot = slotIdGenerator->generate(); + auto outputSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot); + + // Create the union stage. The left branch, which runs first, is our resumability check. + stage = sbe::makeS<sbe::UnionStage>( + makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(minTsBranch), std::move(stage)), + makeVector<sbe::value::SlotVector>(std::move(minTsSlots), std::move(realSlots)), + std::move(outputSlots), + csn->nodeId()); + } + // Add an EOF filter to stop the scan after we fetch the first document that has 'ts' greater // than the upper bound. if (csn->maxTs) { @@ -210,10 +314,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo stage = sbe::makeS<sbe::FilterStage<false, true>>( std::move(stage), makeBinaryOp(sbe::EPrimBinary::lessEq, - sbe::makeE<sbe::EVariable>(*tsSlot), - sbe::makeE<sbe::EConstant>( - sbe::value::TypeTags::Timestamp, - sbe::value::bitcastFrom<uint64_t>((*csn->maxTs).asULL()))), + makeVariable(*tsSlot), + makeConstant(sbe::value::TypeTags::Timestamp, csn->maxTs->asULL())), csn->nodeId()); } @@ -226,16 +328,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo relevantSlots.push_back(*tsSlot); } - auto [_, filterStage] = generateFilter(opCtx, - csn->filter.get(), - std::move(stage), - slotIdGenerator, - frameIdGenerator, - resultSlot, - env, - std::move(relevantSlots), - csn->nodeId()); - stage = std::move(filterStage); + std::tie(std::ignore, stage) = generateFilter(opCtx, + csn->filter.get(), + std::move(stage), + slotIdGenerator, + frameIdGenerator, + resultSlot, + env, + std::move(relevantSlots), + csn->nodeId()); // We may be requested to stop applying the filter after the first match. This can happen // if the query is just a lower bound on 'ts' on a forward scan. In this case every document @@ -366,9 +467,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), csn->nodeId(), seekSlot, - sbe::makeE<sbe::EConstant>( - sbe::value::TypeTags::RecordId, - sbe::value::bitcastFrom<int64_t>(csn->resumeAfterRecordId->asLong()))); + makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->asLong())); // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, // the branch will output the 'seekSlot' to start the real scan from, otherwise it will @@ -437,16 +536,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc relevantSlots.push_back(*tsSlot); } - auto [_, filterStage] = generateFilter(opCtx, - csn->filter.get(), - std::move(stage), - slotIdGenerator, - frameIdGenerator, - resultSlot, - env, - std::move(relevantSlots), - csn->nodeId()); - stage = std::move(filterStage); + std::tie(std::ignore, stage) = generateFilter(opCtx, + csn->filter.get(), + std::move(stage), + slotIdGenerator, + frameIdGenerator, + resultSlot, + env, + std::move(relevantSlots), + csn->nodeId()); } PlanStageSlots outputs; |