summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2021-01-21 23:06:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-15 16:18:24 +0000
commit394572f340bd06f5306a2e961cd9d4e0d3e96bb1 (patch)
tree85d6b30dcb8e142676192da192f7df70d27f58b7
parent16aff18cbb6b993ac325a50a59b1898d35cf08c3 (diff)
downloadmongo-394572f340bd06f5306a2e961cd9d4e0d3e96bb1.tar.gz
SERVER-50580 SBE should obey ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG flag
-rw-r--r--jstests/noPassthrough/change_streams_oplog_rollover.js136
-rw-r--r--jstests/noPassthrough/oplog_rollover_agg.js134
-rw-r--r--src/mongo/db/exec/collection_scan.cpp8
-rw-r--r--src/mongo/db/query/planner_access.cpp7
-rw-r--r--src/mongo/db/query/sbe_stage_builder_coll_scan.cpp158
-rw-r--r--src/mongo/db/query/sbe_stage_builder_helpers.cpp6
-rw-r--r--src/mongo/db/query/sbe_stage_builder_helpers.h3
7 files changed, 417 insertions, 35 deletions
diff --git a/jstests/noPassthrough/change_streams_oplog_rollover.js b/jstests/noPassthrough/change_streams_oplog_rollover.js
new file mode 100644
index 00000000000..5becd1f25d8
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_oplog_rollover.js
@@ -0,0 +1,136 @@
+// Tests the behaviour of change streams on an oplog which rolls over.
+// @tags: [
+// requires_find_command,
+// requires_journaling,
+// requires_majority_read_concern,
+// uses_change_streams,
+// ]
+(function() {
+"use strict";
+
+load('jstests/replsets/rslib.js'); // For getLatestOp.
+load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest.
+
+const oplogSize = 1; // size in MB
+const rst = new ReplSetTest({nodes: 1, oplogSize: oplogSize});
+
+rst.startSet();
+rst.initiate();
+
+const testDB = rst.getPrimary().getDB(jsTestName());
+const testColl = testDB[jsTestName()];
+
+const cst = new ChangeStreamTest(testDB);
+
+// Write a document to the test collection.
+assert.commandWorked(testColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+let changeStream = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: testColl.getName(), includeToken: true});
+
+// We awaited the replication of the insert, so the change stream shouldn't return them.
+assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}}));
+
+// Record current time to resume a change stream later in the test.
+const resumeTimeFirstUpdate = testDB.runCommand({hello: 1}).$clusterTime.clusterTime;
+
+assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}}));
+
+// Test that we see the the update, and remember its resume tokens.
+let next = cst.getOneChange(changeStream);
+assert.eq(next.operationType, "update");
+assert.eq(next.documentKey._id, 1);
+const resumeTokenFromFirstUpdate = next._id;
+
+// Write some additional documents, then test we can resume after the first update.
+assert.commandWorked(testColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+assert.commandWorked(testColl.insert({_id: 3}, {writeConcern: {w: "majority"}}));
+
+changeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ aggregateOptions: {cursor: {batchSize: 0}},
+ collection: testColl.getName()
+});
+
+for (let nextExpectedId of [2, 3]) {
+ assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId);
+}
+
+// Test that the change stream can see additional inserts into the collection.
+assert.commandWorked(testColl.insert({_id: 4}, {writeConcern: {w: "majority"}}));
+assert.commandWorked(testColl.insert({_id: 5}, {writeConcern: {w: "majority"}}));
+
+for (let nextExpectedId of [4, 5]) {
+ assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId);
+}
+
+// Confirm that we can begin a stream at a timestamp that precedes the start of the oplog, if
+// the first entry in the oplog is the replica set initialization message.
+const firstOplogEntry =
+ testDB.getSiblingDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).toArray()[0];
+assert.eq(firstOplogEntry.o.msg, "initiating set");
+assert.eq(firstOplogEntry.op, "n");
+
+const startAtDawnOfTimeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {startAtOperationTime: Timestamp(1, 1)}}],
+ aggregateOptions: {cursor: {batchSize: 0}},
+ collection: testColl.getName()
+});
+
+// The first entry we see should be the initial insert into the collection.
+const firstStreamEntry = cst.getOneChange(startAtDawnOfTimeStream);
+assert.eq(firstStreamEntry.operationType, "insert");
+assert.eq(firstStreamEntry.documentKey._id, 1);
+
+// Test that the stream can't resume if the resume token is no longer present in the oplog.
+
+// Roll over the entire oplog such that none of the events are still present.
+const primaryNode = rst.getPrimary();
+const mostRecentOplogEntry = getLatestOp(primaryNode);
+assert.neq(mostRecentOplogEntry, null);
+const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
+
+function oplogIsRolledOver() {
+ // The oplog has rolled over if the op that used to be newest is now older than the
+ // oplog's current oldest entry. Said another way, the oplog is rolled over when
+ // everything in the oplog is newer than what used to be the newest entry.
+ return bsonWoCompare(mostRecentOplogEntry.ts,
+ getLeastRecentOp({server: primaryNode, readConcern: "majority"}).ts) < 0;
+}
+
+while (!oplogIsRolledOver()) {
+ assert.commandWorked(testColl.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
+}
+
+// Confirm that attempting to continue reading an existing change stream throws CappedPositionLost.
+assert.commandFailedWithCode(
+ testDB.runCommand({getMore: startAtDawnOfTimeStream.id, collection: testColl.getName()}),
+ ErrorCodes.CappedPositionLost);
+
+// Now confirm that attempting to resumeAfter or startAtOperationTime fails.
+ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: testDB,
+ collName: testColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
+ expectedCode: ErrorCodes.ChangeStreamHistoryLost
+});
+
+ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: testDB,
+ collName: testColl.getName(),
+ pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}],
+ expectedCode: ErrorCodes.ChangeStreamHistoryLost
+});
+
+// We also can't start a stream from the "dawn of time" any more, since the first entry in the
+// oplog is no longer the replica set initialization message.
+ChangeStreamTest.assertChangeStreamThrowsCode({
+ db: testDB,
+ collName: testColl.getName(),
+ pipeline: [{$changeStream: {startAtOperationTime: Timestamp(1, 1)}}],
+ expectedCode: ErrorCodes.ChangeStreamHistoryLost
+});
+
+cst.cleanUp();
+rst.stopSet();
+})();
diff --git a/jstests/noPassthrough/oplog_rollover_agg.js b/jstests/noPassthrough/oplog_rollover_agg.js
new file mode 100644
index 00000000000..c41ebb2fedd
--- /dev/null
+++ b/jstests/noPassthrough/oplog_rollover_agg.js
@@ -0,0 +1,134 @@
+// Tests the behaviour of an agg with $_requestReshardingResumeToken on an oplog which rolls over.
+// @tags: [
+// requires_find_command,
+// requires_journaling,
+// requires_majority_read_concern,
+// ]
+(function() {
+"use strict";
+
+load('jstests/replsets/rslib.js'); // For getLatestOp.
+
+const oplogSize = 1; // size in MB
+const rst = new ReplSetTest({nodes: 1, oplogSize: oplogSize});
+
+rst.startSet();
+rst.initiate();
+
+const testDB = rst.getPrimary().getDB(jsTestName());
+const testColl = testDB[jsTestName()];
+
+const localDB = testDB.getSiblingDB("local");
+const oplogColl = localDB.oplog.rs;
+
+// Insert one document into the test collection.
+const insertCmdRes = assert.commandWorked(testDB.runCommand(
+ {insert: testColl.getName(), documents: [{_id: 1}], writeConcern: {w: "majority"}}));
+
+// Record the optime of the insert to resume from later in the test.
+const resumeTimeFirstInsert = insertCmdRes.operationTime;
+
+// Update the document to create another oplog entry.
+assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}}));
+
+// Verify that an aggregation which requests a resharding resume token but does not include a filter
+// on 'ts' is rejected.
+assert.commandFailedWithCode(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}),
+ ErrorCodes.InvalidOptions);
+
+// Verify that we can start an aggregation from the timestamp that we took earlier, and that we see
+// the subsequent update operation.
+let aggCmdRes = assert.commandWorked(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gt: resumeTimeFirstInsert}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}));
+
+const aggCmdCursor = new DBCommandCursor(localDB, aggCmdRes);
+assert.soon(() => aggCmdCursor.hasNext());
+let next = aggCmdCursor.next();
+assert.eq(next.op, "u");
+assert.eq(next.o2._id, 1);
+
+// Confirm that we can begin an aggregation at a timestamp that precedes the start of the oplog, if
+// the first entry in the oplog is the replica set initialization message.
+const firstOplogEntry =
+ testDB.getSiblingDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).toArray()[0];
+assert.eq(firstOplogEntry.o.msg, "initiating set");
+assert.eq(firstOplogEntry.op, "n");
+
+aggCmdRes = assert.commandWorked(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}));
+const startAtDawnOfTimeCursor = new DBCommandCursor(localDB, aggCmdRes);
+
+for (let expectedOp of [{op: "i", _id: 1}, {op: "u", _id: 1}]) {
+ assert.soon(() => startAtDawnOfTimeCursor.hasNext());
+ next = startAtDawnOfTimeCursor.next();
+ assert.eq(next.op, expectedOp.op);
+ assert.eq((next.o._id || next.o2._id), expectedOp._id);
+}
+
+// Roll over the entire oplog such that none of the events are still present.
+const primaryNode = rst.getPrimary();
+const mostRecentOplogEntry = getLatestOp(primaryNode);
+assert.neq(mostRecentOplogEntry, null);
+const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
+
+function oplogIsRolledOver() {
+ // The oplog has rolled over if the op that used to be newest is now older than the
+ // oplog's current oldest entry. Said another way, the oplog is rolled over when
+ // everything in the oplog is newer than what used to be the newest entry.
+ return bsonWoCompare(mostRecentOplogEntry.ts,
+ getLeastRecentOp({server: primaryNode, readConcern: "majority"}).ts) < 0;
+}
+
+while (!oplogIsRolledOver()) {
+ assert.commandWorked(testColl.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
+}
+
+// Test that attempting to start from a timestamp that has already fallen off the oplog fails, if we
+// specify $_requestReshardingResumeToken.
+assert.commandFailedWithCode(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: resumeTimeFirstInsert}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}),
+ ErrorCodes.OplogQueryMinTsMissing);
+
+assert.commandFailedWithCode(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}),
+ ErrorCodes.OplogQueryMinTsMissing);
+
+// However, the same aggregation succeeds if we do not specify $_requestReshardingResumeToken.
+assert.commandWorked(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}],
+ cursor: {},
+}));
+
+// Requesting resume tokens on a find command does not imply 'assertMinTsHasNotFallenOffOplog'.
+assert.commandWorked(localDB.runCommand({
+ find: oplogColl.getName(),
+ filter: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()},
+ tailable: true,
+ hint: {$natural: 1},
+ $_requestResumeToken: true
+}));
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 559f6cc360a..237da29c8a2 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -79,13 +79,11 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
}
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
- // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present.
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can be
+ // incorrectly requested by the user, but in that case we should already have uasserted by now.
if (params.assertMinTsHasNotFallenOffOplog) {
invariant(params.shouldTrackLatestOplogTimestamp);
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "assertMinTsHasNotFallenOffOplog cannot be applied to a query "
- "which does not imply a minimum 'ts' value ",
- params.minTs);
+ invariant(params.minTs);
}
if (params.resumeAfterRecordId) {
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index bba4d16af2f..2f66961a084 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -273,6 +273,13 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
}
}
+ // The user may have requested 'assertMinTsHasNotFallenOffOplog' for a query that does not
+ // specify a minimum timestamp. This is not a valid request, so we throw InvalidOptions.
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "assertMinTsHasNotFallenOffOplog cannot be applied to a query "
+ "which does not imply a minimum 'ts' value ",
+ !(csn->assertMinTsHasNotFallenOffOplog && !csn->minTs));
+
return csn;
}
diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
index c277107c21b..252a5be9ccf 100644
--- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp
@@ -190,9 +190,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
csn->nodeId(),
*seekRecordIdSlot,
- sbe::makeE<sbe::EConstant>(
- sbe::value::TypeTags::RecordId,
- sbe::value::bitcastFrom<int64_t>(seekRecordId->asLong()))),
+ makeConstant(sbe::value::TypeTags::RecordId, seekRecordId->asLong())),
std::move(stage),
sbe::makeSV(),
sbe::makeSV(*seekRecordIdSlot),
@@ -200,6 +198,112 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
csn->nodeId());
}
+ // Create a filter which checks the first document to ensure either that its 'ts' is less than
+ // or equal to minTs, or that it is a replica set initialization message. If this fails, then we
+ // throw ErrorCodes::OplogQueryMinTsMissing. We avoid doing this check on the resumable branch
+ // of a tailable scan; it only needs to be done once, when the initial branch is run.
+ if (csn->assertMinTsHasNotFallenOffOplog && !isTailableResumeBranch) {
+ // We should never see 'assertMinTsHasNotFallenOffOplog' if 'minTS' is not present. This can
+ // be incorrectly requested by the user, but in that case we should already have uasserted.
+ invariant(csn->shouldTrackLatestOplogTimestamp);
+ invariant(csn->minTs);
+
+ // We will be constructing a filter that needs to see the 'ts' field. We name it 'minTsSlot'
+ // here so that it does not shadow the 'tsSlot' which we allocated earlier.
+ auto&& [fields, minTsSlots, minTsSlot] = makeOplogTimestampSlotsIfNeeded(
+ collection, slotIdGenerator, csn->shouldTrackLatestOplogTimestamp);
+
+ // We should always have allocated a 'minTsSlot', and there should always be a 'tsSlot'
+ // already allocated for the existing scan that we created previously.
+ invariant(minTsSlot);
+ invariant(tsSlot);
+
+ // Our filter will also need to see the 'op' and 'o.msg' fields.
+ auto opTypeSlot = slotIdGenerator->generate();
+ auto oObjSlot = slotIdGenerator->generate();
+ minTsSlots.push_back(opTypeSlot);
+ minTsSlots.push_back(oObjSlot);
+ fields.push_back("op");
+ fields.push_back("o");
+
+ // If the first entry we see in the oplog is the replset initialization, then it doesn't
+ // matter if its timestamp is later than the specified minTs; no events earlier than the
+ // minTs can have fallen off this oplog. Otherwise, we must verify that the timestamp of the
+ // first observed oplog entry is earlier than or equal to the minTs time.
+ //
+ // To achieve this, we build a two-branch union subtree. The left branch is a scan with a
+ // filter that checks the first entry in the oplog for the above criteria, throws via EFail
+ // if they are not met, and EOFs otherwise. The right branch of the union plan is the tree
+ // that we originally built above.
+ //
+ // union [s9, s10, s11] [
+ // [s6, s7, s8] efilter {if (ts <= minTs || op == "n" && isObject (o) &&
+ // getField (o, "msg") == "initiating set", false, fail ( 326 ))}
+ // scan [s6 = ts, s7 = op, s8 = o] @oplog,
+ // <stage>
+
+ // Set up the filter stage to be used in the left branch of the union. If the main body of
+ // the expression does not match the input document, it throws OplogQueryMinTsMissing. If
+ // the expression does match, then it returns 'false', which causes the filter (and as a
+ // result, the branch) to EOF immediately. Note that the resultSlot and recordIdSlot
+ // arguments to the ScanStage are boost::none, as we do not need them.
+ auto minTsBranch = sbe::makeS<sbe::FilterStage<false, true>>(
+ sbe::makeS<sbe::ScanStage>(nss,
+ boost::none,
+ boost::none,
+ std::move(fields),
+ minTsSlots, /* don't move this */
+ boost::none,
+ true /* forward */,
+ yieldPolicy,
+ csn->nodeId(),
+ lockAcquisitionCallback),
+ sbe::makeE<sbe::EIf>(
+ makeBinaryOp(
+ sbe::EPrimBinary::logicOr,
+ makeBinaryOp(
+ sbe::EPrimBinary::lessEq,
+ makeVariable(*minTsSlot),
+ makeConstant(sbe::value::TypeTags::Timestamp, csn->minTs->asULL())),
+ makeBinaryOp(
+ sbe::EPrimBinary::logicAnd,
+ makeBinaryOp(
+ sbe::EPrimBinary::eq, makeVariable(opTypeSlot), makeConstant("n")),
+ makeBinaryOp(sbe::EPrimBinary::logicAnd,
+ makeFunction("isObject", makeVariable(oObjSlot)),
+ makeBinaryOp(sbe::EPrimBinary::eq,
+ makeFunction("getField",
+ makeVariable(oObjSlot),
+ makeConstant("msg")),
+ makeConstant(repl::kInitiatingSetMsg))))),
+ makeConstant(sbe::value::TypeTags::Boolean, false),
+ sbe::makeE<sbe::EFail>(ErrorCodes::OplogQueryMinTsMissing,
+ "Specified minTs has already fallen off the oplog")),
+ csn->nodeId());
+
+ // All branches of the UnionStage must have the same number of input and output slots, and
+ // we want to remap all slots from the basic scan we constructed earlier through the union
+ // stage to the output. We're lucky that the real scan happens to have the same number of
+ // slots (resultSlot, recordSlot, tsSlot) as the minTs check branch (minTsSlot, opTypeSlot,
+ // oObjSlot), so we don't have to compensate with any unused slots. Note that the minTsSlots
+ // will never be mapped to output in practice, since the minTs branch either throws or EOFs.
+ //
+ // We also need to update the local variables for each slot to their remapped values, so
+ // subsequent subtrees constructed by this function refer to the correct post-union slots.
+ auto realSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot);
+ resultSlot = slotIdGenerator->generate();
+ recordIdSlot = slotIdGenerator->generate();
+ tsSlot = slotIdGenerator->generate();
+ auto outputSlots = sbe::makeSV(resultSlot, recordIdSlot, *tsSlot);
+
+ // Create the union stage. The left branch, which runs first, is our resumability check.
+ stage = sbe::makeS<sbe::UnionStage>(
+ makeVector<std::unique_ptr<sbe::PlanStage>>(std::move(minTsBranch), std::move(stage)),
+ makeVector<sbe::value::SlotVector>(std::move(minTsSlots), std::move(realSlots)),
+ std::move(outputSlots),
+ csn->nodeId());
+ }
+
// Add an EOF filter to stop the scan after we fetch the first document that has 'ts' greater
// than the upper bound.
if (csn->maxTs) {
@@ -210,10 +314,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
stage = sbe::makeS<sbe::FilterStage<false, true>>(
std::move(stage),
makeBinaryOp(sbe::EPrimBinary::lessEq,
- sbe::makeE<sbe::EVariable>(*tsSlot),
- sbe::makeE<sbe::EConstant>(
- sbe::value::TypeTags::Timestamp,
- sbe::value::bitcastFrom<uint64_t>((*csn->maxTs).asULL()))),
+ makeVariable(*tsSlot),
+ makeConstant(sbe::value::TypeTags::Timestamp, csn->maxTs->asULL())),
csn->nodeId());
}
@@ -226,16 +328,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
relevantSlots.push_back(*tsSlot);
}
- auto [_, filterStage] = generateFilter(opCtx,
- csn->filter.get(),
- std::move(stage),
- slotIdGenerator,
- frameIdGenerator,
- resultSlot,
- env,
- std::move(relevantSlots),
- csn->nodeId());
- stage = std::move(filterStage);
+ std::tie(std::ignore, stage) = generateFilter(opCtx,
+ csn->filter.get(),
+ std::move(stage),
+ slotIdGenerator,
+ frameIdGenerator,
+ resultSlot,
+ env,
+ std::move(relevantSlots),
+ csn->nodeId());
// We may be requested to stop applying the filter after the first match. This can happen
// if the query is just a lower bound on 'ts' on a forward scan. In this case every document
@@ -366,9 +467,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
sbe::makeS<sbe::CoScanStage>(csn->nodeId()), 1, boost::none, csn->nodeId()),
csn->nodeId(),
seekSlot,
- sbe::makeE<sbe::EConstant>(
- sbe::value::TypeTags::RecordId,
- sbe::value::bitcastFrom<int64_t>(csn->resumeAfterRecordId->asLong())));
+ makeConstant(sbe::value::TypeTags::RecordId, csn->resumeAfterRecordId->asLong()));
// Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor,
// the branch will output the 'seekSlot' to start the real scan from, otherwise it will
@@ -437,16 +536,15 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateGenericCollSc
relevantSlots.push_back(*tsSlot);
}
- auto [_, filterStage] = generateFilter(opCtx,
- csn->filter.get(),
- std::move(stage),
- slotIdGenerator,
- frameIdGenerator,
- resultSlot,
- env,
- std::move(relevantSlots),
- csn->nodeId());
- stage = std::move(filterStage);
+ std::tie(std::ignore, stage) = generateFilter(opCtx,
+ csn->filter.get(),
+ std::move(stage),
+ slotIdGenerator,
+ frameIdGenerator,
+ resultSlot,
+ env,
+ std::move(relevantSlots),
+ csn->nodeId());
}
PlanStageSlots outputs;
diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp
index b103ec7dc5b..86c5bad1eb2 100644
--- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp
@@ -203,6 +203,12 @@ std::unique_ptr<sbe::EExpression> makeFillEmptyFalse(std::unique_ptr<sbe::EExpre
sbe::value::bitcastFrom<bool>(false)));
}
+std::unique_ptr<sbe::EExpression> makeVariable(sbe::value::SlotId slotId,
+ boost::optional<sbe::FrameId> frameId) {
+ return frameId ? sbe::makeE<sbe::EVariable>(*frameId, slotId)
+ : sbe::makeE<sbe::EVariable>(slotId);
+}
+
std::unique_ptr<sbe::EExpression> makeFillEmptyNull(std::unique_ptr<sbe::EExpression> e) {
using namespace std::literals;
return makeFunction(
diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h
index 117de1dfcae..d51a56f1902 100644
--- a/src/mongo/db/query/sbe_stage_builder_helpers.h
+++ b/src/mongo/db/query/sbe_stage_builder_helpers.h
@@ -202,6 +202,9 @@ inline auto makeConstant(std::string_view str) {
return sbe::makeE<sbe::EConstant>(tag, value);
}
+std::unique_ptr<sbe::EExpression> makeVariable(sbe::value::SlotId slotId,
+ boost::optional<sbe::FrameId> frameId = {});
+
/**
* Check if expression returns Nothing and return null if so. Otherwise, return the
* expression.