diff options
-rw-r--r-- | jstests/noPassthrough/change_streams_oplog_rollover.js | 136 | ||||
-rw-r--r-- | jstests/noPassthrough/oplog_rollover_agg.js | 134 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/query/planner_access.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 158 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_helpers.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_helpers.h | 3 |
7 files changed, 417 insertions, 35 deletions
diff --git a/jstests/noPassthrough/change_streams_oplog_rollover.js b/jstests/noPassthrough/change_streams_oplog_rollover.js new file mode 100644 index 00000000000..5becd1f25d8 --- /dev/null +++ b/jstests/noPassthrough/change_streams_oplog_rollover.js @@ -0,0 +1,136 @@ +// Tests the behaviour of change streams on an oplog which rolls over. +// @tags: [ +// requires_find_command, +// requires_journaling, +// requires_majority_read_concern, +// uses_change_streams, +// ] +(function() { +"use strict"; + +load('jstests/replsets/rslib.js'); // For getLatestOp. +load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + +const oplogSize = 1; // size in MB +const rst = new ReplSetTest({nodes: 1, oplogSize: oplogSize}); + +rst.startSet(); +rst.initiate(); + +const testDB = rst.getPrimary().getDB(jsTestName()); +const testColl = testDB[jsTestName()]; + +const cst = new ChangeStreamTest(testDB); + +// Write a document to the test collection. +assert.commandWorked(testColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + +let changeStream = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: testColl.getName(), includeToken: true}); + +// We awaited the replication of the insert, so the change stream shouldn't return them. +assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}})); + +// Record current time to resume a change stream later in the test. +const resumeTimeFirstUpdate = testDB.runCommand({hello: 1}).$clusterTime.clusterTime; + +assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}})); + +// Test that we see the the update, and remember its resume tokens. +let next = cst.getOneChange(changeStream); +assert.eq(next.operationType, "update"); +assert.eq(next.documentKey._id, 1); +const resumeTokenFromFirstUpdate = next._id; + +// Write some additional documents, then test we can resume after the first update. +assert.commandWorked(testColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); +assert.commandWorked(testColl.insert({_id: 3}, {writeConcern: {w: "majority"}})); + +changeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], + aggregateOptions: {cursor: {batchSize: 0}}, + collection: testColl.getName() +}); + +for (let nextExpectedId of [2, 3]) { + assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId); +} + +// Test that the change stream can see additional inserts into the collection. +assert.commandWorked(testColl.insert({_id: 4}, {writeConcern: {w: "majority"}})); +assert.commandWorked(testColl.insert({_id: 5}, {writeConcern: {w: "majority"}})); + +for (let nextExpectedId of [4, 5]) { + assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId); +} + +// Confirm that we can begin a stream at a timestamp that precedes the start of the oplog, if +// the first entry in the oplog is the replica set initialization message. +const firstOplogEntry = + testDB.getSiblingDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).toArray()[0]; +assert.eq(firstOplogEntry.o.msg, "initiating set"); +assert.eq(firstOplogEntry.op, "n"); + +const startAtDawnOfTimeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {startAtOperationTime: Timestamp(1, 1)}}], + aggregateOptions: {cursor: {batchSize: 0}}, + collection: testColl.getName() +}); + +// The first entry we see should be the initial insert into the collection. +const firstStreamEntry = cst.getOneChange(startAtDawnOfTimeStream); +assert.eq(firstStreamEntry.operationType, "insert"); +assert.eq(firstStreamEntry.documentKey._id, 1); + +// Test that the stream can't resume if the resume token is no longer present in the oplog. + +// Roll over the entire oplog such that none of the events are still present. +const primaryNode = rst.getPrimary(); +const mostRecentOplogEntry = getLatestOp(primaryNode); +assert.neq(mostRecentOplogEntry, null); +const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi'); + +function oplogIsRolledOver() { + // The oplog has rolled over if the op that used to be newest is now older than the + // oplog's current oldest entry. Said another way, the oplog is rolled over when + // everything in the oplog is newer than what used to be the newest entry. + return bsonWoCompare(mostRecentOplogEntry.ts, + getLeastRecentOp({server: primaryNode, readConcern: "majority"}).ts) < 0; +} + +while (!oplogIsRolledOver()) { + assert.commandWorked(testColl.insert({long_str: largeStr}, {writeConcern: {w: "majority"}})); +} + +// Confirm that attempting to continue reading an existing change stream throws CappedPositionLost. +assert.commandFailedWithCode( + testDB.runCommand({getMore: startAtDawnOfTimeStream.id, collection: testColl.getName()}), + ErrorCodes.CappedPositionLost); + +// Now confirm that attempting to resumeAfter or startAtOperationTime fails. +ChangeStreamTest.assertChangeStreamThrowsCode({ + db: testDB, + collName: testColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], + expectedCode: ErrorCodes.ChangeStreamHistoryLost +}); + +ChangeStreamTest.assertChangeStreamThrowsCode({ + db: testDB, + collName: testColl.getName(), + pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}], + expectedCode: ErrorCodes.ChangeStreamHistoryLost +}); + +// We also can't start a stream from the "dawn of time" any more, since the first entry in the +// oplog is no longer the replica set initialization message. +ChangeStreamTest.assertChangeStreamThrowsCode({ + db: testDB, + collName: testColl.getName(), + pipeline: [{$changeStream: {startAtOperationTime: Timestamp(1, 1)}}], + expectedCode: ErrorCodes.ChangeStreamHistoryLost +}); + +cst.cleanUp(); +rst.stopSet(); +})(); diff --git a/jstests/noPassthrough/oplog_rollover_agg.js b/jstests/noPassthrough/oplog_rollover_agg.js new file mode 100644 index 00000000000..c41ebb2fedd --- /dev/null +++ b/jstests/noPassthrough/oplog_rollover_agg.js @@ -0,0 +1,134 @@ +// Tests the behaviour of an agg with $_requestReshardingResumeToken on an oplog which rolls over. +// @tags: [ +// requires_find_command, +// requires_journaling, +// requires_majority_read_concern, +// ] +(function() { +"use strict"; + +load('jstests/replsets/rslib.js'); // For getLatestOp. + +const oplogSize = 1; // size in MB +const rst = new ReplSetTest({nodes: 1, oplogSize: oplogSize}); + +rst.startSet(); +rst.initiate(); + +const testDB = rst.getPrimary().getDB(jsTestName()); +const testColl = testDB[jsTestName()]; + +const localDB = testDB.getSiblingDB("local"); +const oplogColl = localDB.oplog.rs; + +// Insert one document into the test collection. +const insertCmdRes = assert.commandWorked(testDB.runCommand( + {insert: testColl.getName(), documents: [{_id: 1}], writeConcern: {w: "majority"}})); + +// Record the optime of the insert to resume from later in the test. +const resumeTimeFirstInsert = insertCmdRes.operationTime; + +// Update the document to create another oplog entry. +assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}})); + +// Verify that an aggregation which requests a resharding resume token but does not include a filter +// on 'ts' is rejected. +assert.commandFailedWithCode(localDB.runCommand({ + aggregate: oplogColl.getName(), + pipeline: [{$match: {ns: testColl.getFullName()}}], + cursor: {}, + $_requestReshardingResumeToken: true +}), + ErrorCodes.InvalidOptions); + +// Verify that we can start an aggregation from the timestamp that we took earlier, and that we see +// the subsequent update operation. +let aggCmdRes = assert.commandWorked(localDB.runCommand({ + aggregate: oplogColl.getName(), + pipeline: [{$match: {ts: {$gt: resumeTimeFirstInsert}, ns: testColl.getFullName()}}], + cursor: {}, + $_requestReshardingResumeToken: true +})); + +const aggCmdCursor = new DBCommandCursor(localDB, aggCmdRes); +assert.soon(() => aggCmdCursor.hasNext()); +let next = aggCmdCursor.next(); +assert.eq(next.op, "u"); +assert.eq(next.o2._id, 1); + +// Confirm that we can begin an aggregation at a timestamp that precedes the start of the oplog, if +// the first entry in the oplog is the replica set initialization message. +const firstOplogEntry = + testDB.getSiblingDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).toArray()[0]; +assert.eq(firstOplogEntry.o.msg, "initiating set"); +assert.eq(firstOplogEntry.op, "n"); + +aggCmdRes = assert.commandWorked(localDB.runCommand({ + aggregate: oplogColl.getName(), + pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}], + cursor: {}, + $_requestReshardingResumeToken: true +})); +const startAtDawnOfTimeCursor = new DBCommandCursor(localDB, aggCmdRes); + +for (let expectedOp of [{op: "i", _id: 1}, {op: "u", _id: 1}]) { + assert.soon(() => startAtDawnOfTimeCursor.hasNext()); + next = startAtDawnOfTimeCursor.next(); + assert.eq(next.op, expectedOp.op); + assert.eq((next.o._id || next.o2._id), expectedOp._id); +} + +// Roll over the entire oplog such that none of the events are still present. +const primaryNode = rst.getPrimary(); +const mostRecentOplogEntry = getLatestOp(primaryNode); +assert.neq(mostRecentOplogEntry, null); +const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi'); + +function oplogIsRolledOver() { + // The oplog has rolled over if the op that used to be newest is now older than the + // oplog's current oldest entry. Said another way, the oplog is rolled over when + // everything in the oplog is newer than what used to be the newest entry. + return bsonWoCompare(mostRecentOplogEntry.ts, + getLeastRecentOp({server: primaryNode, readConcern: "majority"}).ts) < 0; +} + +while (!oplogIsRolledOver()) { + assert.commandWorked(testColl.insert({long_str: largeStr}, {writeConcern: {w: "majority"}})); +} + +// Test that attempting to start from a timestamp that has already fallen off the oplog fails, if we +// specify $_requestReshardingResumeToken. +assert.commandFailedWithCode(localDB.runCommand({ + aggregate: oplogColl.getName(), + pipeline: [{$match: {ts: {$gte: resumeTimeFirstInsert}, ns: testColl.getFullName()}}], + cursor: {}, + $_requestReshardingResumeToken: true +}), + ErrorCodes.OplogQueryMinTsMissing); + +assert.commandFailedWithCode(localDB.runCommand({ + aggregate: oplogColl.getName(), + pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}], + cursor: {}, + $_requestReshardingResumeToken: true +}), + ErrorCodes.OplogQueryMinTsMissing); + +// However, the same aggregation succeeds if we do not specify $_requestReshardingResumeToken. +assert.commandWorked(localDB.runCommand({ + aggregate: oplogColl.getName(), + pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}], + cursor: {}, +})); + +// Requesting resume tokens on a find command does not imply 'assertMinTsHasNotFallenOffOplog'. +assert.commandWorked(localDB.runCommand({ + find: oplogColl.getName(), + filter: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}, + tailable: true, + hint: {$natural: 1}, + $_requestResumeToken: true +})); + +rst.stopSet(); +})(); diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 559f6cc360a..237da29c8a2 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -79,13 +79,11 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, } invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); - // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can be + // incorrectly requested by the user, but in that case we should already have uasserted by now. if (params.assertMinTsHasNotFallenOffOplog) { invariant(params.shouldTrackLatestOplogTimestamp); - uassert(ErrorCodes::InvalidOptions, - str::stream() << "assertMinTsHasNotFallenOffOplog cannot be applied to a query " - "which does not imply a minimum 'ts' value ", - params.minTs); + invariant(params.minTs); } if (params.resumeAfterRecordId) { diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index bba4d16af2f..2f66961a084 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -273,6 +273,13 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( } } + // The user may have requested 'assertMinTsHasNotFallenOffOplog' for a query that does not + // specify a minimum timestamp. This is not a valid request, so we throw InvalidOptions. + uassert(ErrorCodes::InvalidOptions, + str::stream() << "assertMinTsHasNotFallenOffOplog cannot be applied to a query " + "which does not imply a minimum 'ts' value ", + !(csn->assertMinTsHasNotFallenOffOplog && !csn->minTs)); + return csn; } diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index c277107c21b..252a5be9ccf 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -190,9 +190,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), csn->nodeId(), *seekRecordIdSlot, - sbe::makeE<sbe::EConstant>( - sbe::value::TypeTags::RecordId, - sbe::value::bitcastFrom<int64_t>(seekRecordId->asLong()))), + makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->asLong())), std::move(stage), sbe::makeSV(), sbe::makeSV(*seekRecordIdSlot), @@ -200,6 +198,112 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo csn->nodeId()); } + // Create a filter which checks the first document to ensure either that its 'ts' is less than + // or equal to minTs, or that it is a replica set initialization message. If this fails, then we + // throw ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch + // of a tailable scan; it only needs to be done once, when the initial branch is run. + if (csn->assertMinTsHasNotFallenOffOplog && !isTailableResumeBranch) { + // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can + // be incorrectly requested by the user, but in that case we should already have uasserted. + invariant(csn->shouldTrackLatestOplogTimestamp); + invariant(csn->minTs); + + // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot' + // here so that it does not shadow the 'tsSlot' which we allocated earlier. + auto&& [fields, minTsSlots, minTsSlot] = makeOplogTimestampSlotsIfNeeded( + collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp); + + // We should always have allocated a 'minTsSlot', and there should always be a 'tsSlot' + // already allocated for the existing scan that we created previously. + invariant(minTsSlot); + invariant(tsSlot); + + // Our filter will also need to see the 'op' and 'o.msg' fields. + auto opTypeSlot = slotIdGenerator->generate(); + auto oObjSlot = slotIdGenerator->generate(); + minTsSlots.push_back(opTypeSlot); + minTsSlots.push_back(oObjSlot); + fields.push_back("op"); + fields.push_back("o"); + + // If the first entry we see in the oplog is the replset initialization, then it doesn't + // matter if its timestamp is later than the specified minTs; no events earlier than the + // minTs can have fallen off this oplog. Otherwise, we must verify that the timestamp of the + // first observed oplog entry is earlier than or equal to the minTs time. + // + // To achieve this, we build a two-branch union subtree. The left branch is a scan with a + // filter that checks the first entry in the oplog for the above criteria, throws via EFail + // if they are not met, and EOFs otherwise. The right branch of the union plan is the tree + // that we originally built above. + // + // union [s9, s10, s11] [ + // [s6, s7, s8] efilter {if (ts <= minTs || op == "n" && isObject (o) && + // getField (o, "msg") == "initiating set", false, fail ( 326 ))} + // scan [s6 = ts, s7 = op, s8 = o] @oplog, + // <stage> + + // Set up the filter stage to be used in the left branch of the union. If the main body of + // the expression does not match the input document, it throws OplogQueryMinTsMissing. If + // the expression does match, then it returns 'false', which causes the filter (and as a + // result, the branch) to EOF immediately. Note that the resultSlot and recordIdSlot + // arguments to the ScanStage are boost::none, as we do not need them. + auto minTsBranch = sbe::makeS<sbe::FilterStage<false, true>>( + sbe::makeS<sbe::ScanStage>(nss, + boost::none, + boost::none, + std::move(fields), + minTsSlots, /* don't move this */ + boost::none, + true /* forward */, + yieldPolicy, + csn->nodeId(), + lockAcquisitionCallback), + sbe::makeE<sbe::EIf>( + makeBinaryOp( + sbe::EPrimBinary::logicOr, + makeBinaryOp( + sbe::EPrimBinary::lessEq, + makeVariable(*minTsSlot), + makeConstant(sbe::value::TypeTags::Timestamp, csn->minTs->asULL())), + makeBinaryOp( + sbe::EPrimBinary::logicAnd, + makeBinaryOp( + sbe::EPrimBinary::eq, makeVariable(opTypeSlot), makeConstant("n")), + makeBinaryOp(sbe::EPrimBinary::logicAnd, + makeFunction("isObject", makeVariable(oObjSlot)), + makeBinaryOp(sbe::EPrimBinary::eq, + makeFunction("getField", + makeVariable(oObjSlot), + makeConstant("msg")), + makeConstant(repl::kInitiatingSetMsg))))), + makeConstant(sbe::value::TypeTags::Boolean, false), + sbe::makeE<sbe::EFail>(ErrorCodes::OplogQueryMinTsMissing, + "Specified minTs has already fallen off the oplog")), + csn->nodeId()); + + // All branches of the UnionStage must have the same number of input and output slots, and + // we want to remap all slots from the basic scan we constructed earlier through the union + // stage to the output. We're lucky that the real scan happens to have the same number of + // slots (resultSlot, recordSlot, tsSlot) as the minTs check branch (minTsSlot, opTypeSlot, + // oObjSlot), so we don't have to compensate with any unused slots. Note that the minTsSlots + // will never be mapped to output in practice, since the minTs branch either throws or EOFs. + // + // We also need to update the local variables for each slot to their remapped values, so + // subsequent subtrees constructed by this function refer to the correct post-union slots. + auto realSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot); + resultSlot = slotIdGenerator->generate(); + recordIdSlot = slotIdGenerator->generate(); + tsSlot = slotIdGenerator->generate(); + auto outputSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot); + + // Create the union stage. The left branch, which runs first, is our resumability check. + stage = sbe::makeS<sbe::UnionStage>( + makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(minTsBranch), std::move(stage)), + makeVector<sbe::value::SlotVector>(std::move(minTsSlots), std::move(realSlots)), + std::move(outputSlots), + csn->nodeId()); + } + // Add an EOF filter to stop the scan after we fetch the first document that has 'ts' greater // than the upper bound. if (csn->maxTs) { @@ -210,10 +314,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo stage = sbe::makeS<sbe::FilterStage<false, true>>( std::move(stage), makeBinaryOp(sbe::EPrimBinary::lessEq, - sbe::makeE<sbe::EVariable>(*tsSlot), - sbe::makeE<sbe::EConstant>( - sbe::value::TypeTags::Timestamp, - sbe::value::bitcastFrom<uint64_t>((*csn->maxTs).asULL()))), + makeVariable(*tsSlot), + makeConstant(sbe::value::TypeTags::Timestamp, csn->maxTs->asULL())), csn->nodeId()); } @@ -226,16 +328,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo relevantSlots.push_back(*tsSlot); } - auto [_, filterStage] = generateFilter(opCtx, - csn->filter.get(), - std::move(stage), - slotIdGenerator, - frameIdGenerator, - resultSlot, - env, - std::move(relevantSlots), - csn->nodeId()); - stage = std::move(filterStage); + std::tie(std::ignore, stage) = generateFilter(opCtx, + csn->filter.get(), + std::move(stage), + slotIdGenerator, + frameIdGenerator, + resultSlot, + env, + std::move(relevantSlots), + csn->nodeId()); // We may be requested to stop applying the filter after the first match. This can happen // if the query is just a lower bound on 'ts' on a forward scan. In this case every document @@ -366,9 +467,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()), csn->nodeId(), seekSlot, - sbe::makeE<sbe::EConstant>( - sbe::value::TypeTags::RecordId, - sbe::value::bitcastFrom<int64_t>(csn->resumeAfterRecordId->asLong()))); + makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->asLong())); // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, // the branch will output the 'seekSlot' to start the real scan from, otherwise it will @@ -437,16 +536,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc relevantSlots.push_back(*tsSlot); } - auto [_, filterStage] = generateFilter(opCtx, - csn->filter.get(), - std::move(stage), - slotIdGenerator, - frameIdGenerator, - resultSlot, - env, - std::move(relevantSlots), - csn->nodeId()); - stage = std::move(filterStage); + std::tie(std::ignore, stage) = generateFilter(opCtx, + csn->filter.get(), + std::move(stage), + slotIdGenerator, + frameIdGenerator, + resultSlot, + env, + std::move(relevantSlots), + csn->nodeId()); } PlanStageSlots outputs; diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp index b103ec7dc5b..86c5bad1eb2 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp +++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp @@ -203,6 +203,12 @@ std::unique_ptr<sbe::EExpression> makeFillEmptyFalse(std::unique_ptr<sbe::EExpre sbe::value::bitcastFrom<bool>(false))); } +std::unique_ptr<sbe::EExpression> makeVariable(sbe::value::SlotId slotId, + boost::optional<sbe::FrameId> frameId) { + return frameId ? sbe::makeE<sbe::EVariable>(*frameId, slotId) + : sbe::makeE<sbe::EVariable>(slotId); +} + std::unique_ptr<sbe::EExpression> makeFillEmptyNull(std::unique_ptr<sbe::EExpression> e) { using namespace std::literals; return makeFunction( diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h index 117de1dfcae..d51a56f1902 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.h +++ b/src/mongo/db/query/sbe_stage_builder_helpers.h @@ -202,6 +202,9 @@ inline auto makeConstant(std::string_view str) { return sbe::makeE<sbe::EConstant>(tag, value); } +std::unique_ptr<sbe::EExpression> makeVariable(sbe::value::SlotId slotId, + boost::optional<sbe::FrameId> frameId = {}); + /** * Check if expression returns Nothing and return null if so. Otherwise, return the * expression. |