summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-01-07 23:23:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-08 11:06:37 +0000
commit0cf0d832fd76a93496e5a6df71b417a5059b0bd8 (patch)
treea7ad3e5d32b58fb2c3de390d23da4c2df330cf70
parentccaed20db0dcff679c8ac208fe08c4fda24ffd83 (diff)
downloadmongo-0cf0d832fd76a93496e5a6df71b417a5059b0bd8.tar.gz
SERVER-62081 Exhaustively test and fix change stream rewrites against null-equality predicates
(cherry picked from commit 401fff7b836f0ad71d0e9e714a5801287dba8ce1)
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js13
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js7
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js35
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js286
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp391
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.h9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp694
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp28
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h3
9 files changed, 1242 insertions, 224 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
index 4a293da3290..a9a7bfc3455 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
@@ -135,6 +135,12 @@ for (const op of ["insert", "update", "replace", "delete"]) {
[[op, 3, 0], [op, 3, 1]],
[1, 1] /* expectedOplogCursorReturnedDocs */);
+ // Test out an {$eq: null} predicate on 'documentKey._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey._id": {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
// Test out a negated predicate on 'documentKey.shard'. It's not possible to rewrite this
// predicate and make it part of the oplog filter, so we expect the oplog cursor to return 2
// docs on each shard.
@@ -150,6 +156,13 @@ for (const op of ["insert", "update", "replace", "delete"]) {
[[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
[2, 2] /* expectedOplogCursorReturnedDocs */);
+ // Test out the '{$eq: null}' predicate on a field that doesn't exist in 'documentKey' but that
+ // does exist in some of the underlying documents.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey.z": {$eq: null}}},
+ [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
// Test out an $expr predicate on the full 'documentKey' field.
verifyOps(
resumeAfterToken,
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
index 61ce95e1e79..8f7262553de 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
@@ -141,6 +141,13 @@ verifyNonInvalidatingOps(resumeAfterToken,
[] /* expectedOps */,
0 /* expectedOplogRetDocsForEachShard */);
+// Ensure that the '$eq: null' on operation type sub-field can be rewritten to the oplog format. The
+// oplog cursor should return all documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {"operationType.subField": {$eq: null}}},
+ ["insert", "update", "replace", "delete"] /* expectedOps */,
+ 4 /* expectedOplogRetDocsForEachShard */);
+
// Ensure that the '$match' on the operation type with '$in' is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$in: ["insert", "update"]}}},
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
index 13fbbad16b5..fcbd2f6b9c6 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
@@ -141,6 +141,26 @@ for (const reverseShards of [false, true]) {
[[op, 2, 0]],
[2, 2] /* expectedOplogCursorReturnedDocs */);
+ // Test out an $eq:null predicate on the full 'updateDescription' field.
+ verifyOps({$match: {operationType: op, updateDescription: {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on the full 'updateDescription' field.
+ verifyOps({$match: {operationType: op, updateDescription: {$exists: false}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $eq:null predicate on 'updateDescription.updatedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on 'updateDescription.updatedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$exists: false}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
// Test out an $eq predicate on 'updateDescription.updatedFields.f'.
verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": "b"}},
[[op, 3, 0]],
@@ -171,6 +191,16 @@ for (const reverseShards of [false, true]) {
[[op, 3, 1]],
[0, 1] /* expectedOplogCursorReturnedDocs */);
+ // Test out an $eq:null predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$exists: false}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
// Test out a non-dotted string $eq predicate on 'updateDescription.removedFields'.
verifyOps({$match: {operationType: op, "updateDescription.removedFields": "z"}},
[[op, 2, 0], [op, 2, 1]],
@@ -218,6 +248,11 @@ for (const reverseShards of [false, true]) {
[[op, 2, 0], [op, 3, 0], [op, 3, 1]],
[2, 1] /* expectedOplogCursorReturnedDocs */);
+ // Test out an {$eq:null} predicate on 'updateDescription.updatedFields.g'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": {$eq: null}}},
+ [[op, 2, 0], [op, 3, 0], [op, 3, 1]],
+ [2, 1] /* expectedOplogCursorReturnedDocs */);
+
// Test out a negated $eq predicate on 'updateDescription.removedFields'.
verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$not: {$eq: "z"}}}},
[[op, 3, 0], [op, 3, 1]],
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
new file mode 100644
index 00000000000..43df7cfd1bb
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
@@ -0,0 +1,286 @@
+/**
+ * Tests that change streams correctly handle rewrites of null, existence and equality checks, for
+ * both existent and non-existent fields and subfields.
+ * @tags: [
+ * featureFlagChangeStreamsRewrite,
+ * requires_fcv_51,
+ * requires_pipeline_optimization,
+ * uses_change_streams
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+load("jstests/libs/fixture_helpers.js"); // For isMongos.
+
+const dbName = "change_stream_rewrite_null_existence_test";
+
+const collName = "coll1";
+const collNameAfterRename = "coll_renamed";
+
+// Establish a resume token at a point before anything actually happens in the test.
+const startPoint = db.getMongo().watch().getResumeToken();
+
+// If this is a sharded passthrough, make sure we shard on something other than _id.
+if (FixtureHelpers.isMongos(db)) {
+ assertDropCollection(db, collName);
+ assert.commandWorked(db.adminCommand({enableSharding: dbName}));
+ assert.commandWorked(
+ db.adminCommand({shardCollection: `${dbName}.${collName}`, key: {shardKey: "hashed"}}));
+}
+
+const testDB = db.getSiblingDB(dbName);
+let testColl = testDB[collName];
+
+const numDocs = 8;
+
+// Generate the write workload.
+(function performWriteWorkload() {
+ // Insert some documents.
+ for (let i = 0; i < numDocs; ++i) {
+ assert.commandWorked(testColl.insert(
+ {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false}));
+ }
+
+ // Update half of them. We generate these updates individually so that they generate different
+ // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields.
+ const updateSpecs = [
+ [{$set: {f2: true}}], // only populates 'updatedFields'
+ [{$unset: ["f1"]}], // only populates 'removedFields'
+ [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays'
+ [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields
+ ];
+ for (let i = 0; i < numDocs / 2; ++i) {
+ assert.commandWorked(
+ testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)]));
+ }
+
+ // Replace the other half.
+ for (let i = numDocs / 2; i < numDocs; ++i) {
+ assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i}));
+ }
+
+ // Delete half of the updated documents.
+ for (let i = 0; i < numDocs / 4; ++i) {
+ assert.commandWorked(testColl.remove({_id: i, shardKey: i}));
+ }
+})();
+
+// Rename the collection.
+assert.commandWorked(testColl.renameCollection(collNameAfterRename));
+testColl = testDB[collNameAfterRename];
+
+// Drop the collection.
+assert(testColl.drop());
+
+// Drop the database.
+assert.commandWorked(testDB.dropDatabase());
+
+// Function to generate a list of all paths to be tested from those observed in the event stream.
+function traverseEvent(event, outputMap, prefixPath = "") {
+ // The number of values to equality-test for each path. Set this to Infinity to test everything.
+ const maxValuesPerPath = 1;
+
+ // Begin traversing through the event, adding paths and values into 'outputMap'.
+ for (let fieldName in event) {
+ const fieldPath = (prefixPath.length > 0 ? prefixPath + "." : "") + fieldName;
+ const fieldVal = event[fieldName];
+
+ // Create an entry for this field if it doesn't already exist.
+ if (!outputMap[fieldPath]) {
+ outputMap[fieldPath] = {extraValues: [], values: []};
+ }
+
+ // Add entries for each of the standard subfields that we test for every existent field.
+ for (let subField of standardSubFieldsToTest) {
+ const subFieldPathToAdd = fieldPath + "." + subField;
+ if (!outputMap[subFieldPathToAdd]) {
+ outputMap[subFieldPathToAdd] = {extraValues: [], values: []};
+ }
+ }
+
+ // Helper function to add a new value into the fields list.
+ function addToPredicatesList(fieldPath, fieldVal) {
+ const alreadyExists =
+ outputMap[fieldPath].values.some((elem) => friendlyEqual(elem, fieldVal));
+ const numValues = outputMap[fieldPath].values.length;
+ if (!alreadyExists && numValues < maxValuesPerPath) {
+ outputMap[fieldPath].values.push(fieldVal);
+ }
+ }
+
+ // Helper function to check whether this value is a plain old javascript object.
+ function isPlainObject(value) {
+ return (value && typeof (value) == "object" && value.constructor === Object);
+ }
+
+ // Add a predicate on the full field, whether scalar, object, or array.
+ addToPredicatesList(fieldPath, fieldVal);
+
+ // If the field is an object, traverse through it.
+ if (isPlainObject(fieldVal)) {
+ traverseEvent(fieldVal, outputMap, fieldPath);
+ }
+
+ // If the field is an array, find any subobjects and traverse them.
+ if (Array.isArray(fieldVal)) {
+ for (let arrayElem of fieldVal) {
+ if (isPlainObject(arrayElem)) {
+ traverseEvent(arrayElem, outputMap, fieldPath);
+ } else {
+ addToPredicatesList(fieldPath, arrayElem);
+ }
+ }
+ // Traverse through the array itself as an object. This will descend into the array by
+ // index, allowing us to test fieldname-or-array-index matching semantics.
+ traverseEvent(fieldVal, outputMap, fieldPath);
+ }
+ }
+}
+
+// Helper function to fully exhaust a change stream from the start point and return all events.
+function getAllChangeStreamEvents(extraStages, csOptions) {
+ // Open a whole-cluster stream based on the supplied arguments.
+ const csCursor = testDB.getMongo().watch(
+ extraStages, Object.assign({resumeAfter: startPoint, maxAwaitTimeMS: 1}, csOptions));
+
+ // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will
+ // guarantee that all shards have returned results, and we expect all results to fit into a
+ // single batch, so we know we have exhausted the stream.
+ while (bsonWoCompare(csCursor._postBatchResumeToken, startPoint) == 0) {
+ csCursor.hasNext(); // runs a getMore
+ }
+
+ // Close the cursor since we have already retrieved all results.
+ csCursor.close();
+
+ // Extract all events from the streams. Since the cursor is closed, it will not attempt to
+ // retrieve any more batches from the server.
+ return csCursor.toArray();
+}
+
+// Obtain a list of all events that occurred during the write workload.
+const allEvents = getAllChangeStreamEvents([], {fullDocument: "updateLookup"});
+
+jsTestLog(`All events: ${tojson(allEvents)}`);
+
+// List of specific fields and values that we wish to test. This will be populated during traversal
+// of the events in the stream, but we can add further paths and extra values which will not appear
+// in the stream but which we nonetheless want to test. Note that null and existence predicates will
+// be tested for every field, and do not need to be explicitly specified here. The format of each
+// entry is as follows:
+//
+// {
+// "full.path.to.field": {
+// extraValues: [special values we wish to test that do not appear in the stream],
+// values: [automatically populated by examining the stream]
+// }
+// }
+const fieldsToBeTested = {
+ // Test documentKey with a field that is in the full object but not in the documentKey.
+ "documentKey": {extraValues: [{f2: null, _id: 1}], values: []},
+ "documentKey.f1": {extraValues: [{subField: true}], values: []}
+};
+
+// Always test these subfields for all parent fields.
+const standardSubFieldsToTest = ["nonExistentField"];
+
+// Traverse each event in the stream and build up a map of all field paths.
+allEvents.forEach((event) => traverseEvent(event, fieldsToBeTested));
+
+jsTestLog(`Final set of fields to test: ${tojson(fieldsToBeTested)}`);
+
+// Define the filters that we want to apply to each field.
+function generateMatchFilters(fieldPath) {
+ const valuesToTest =
+ fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues);
+
+ const filters = [
+ {[fieldPath]: {$eq: null}},
+ {[fieldPath]: {$ne: null}},
+ {[fieldPath]: {$lte: null}},
+ {[fieldPath]: {$gte: null}},
+ {[fieldPath]: {$exists: true}},
+ {[fieldPath]: {$exists: false}}
+ ];
+
+ for (let value of valuesToTest) {
+ filters.push({[fieldPath]: value});
+ }
+
+ return filters;
+}
+function generateExprFilters(fieldPath) {
+ const valuesToTest =
+ fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues);
+
+ const exprFieldPath = "$" + fieldPath;
+ const exprs = [
+ {$expr: {$eq: [exprFieldPath, null]}},
+ {$expr: {$ne: [exprFieldPath, null]}},
+ {$expr: {$lte: [exprFieldPath, null]}},
+ {$expr: {$gte: [exprFieldPath, null]}},
+ {$expr: {$eq: [exprFieldPath, "$$REMOVE"]}},
+ {$expr: {$ne: [exprFieldPath, "$$REMOVE"]}},
+ {$expr: {$lte: [exprFieldPath, "$$REMOVE"]}},
+ {$expr: {$gte: [exprFieldPath, "$$REMOVE"]}}
+ ];
+
+ for (let value of valuesToTest) {
+ exprs.push({$expr: {$eq: [exprFieldPath, value]}});
+ }
+
+ return exprs;
+}
+
+// Record all failed test cases to be reported at the end of the test.
+const failedTestCases = [];
+
+// Confirm that the output of an optimized change stream matches an unoptimized stream.
+for (let csConfig of [{fullDocument: "updateLookup"}]) {
+ for (let fieldToTest in fieldsToBeTested) {
+ const predicatesToTest =
+ generateMatchFilters(fieldToTest).concat(generateExprFilters(fieldToTest));
+ for (let predicate of predicatesToTest) {
+ // Create a $match expression for the current predicate.
+ const matchExpr = {$match: predicate};
+
+ jsTestLog(`Testing filter ${tojsononeline(matchExpr)} with ${tojsononeline(csConfig)}`);
+
+ // Construct one optimized pipeline, and one which inhibits optimization.
+ const nonOptimizedPipeline = [{$_internalInhibitOptimization: {}}, matchExpr];
+ const optimizedPipeline = [matchExpr];
+
+ // Extract all results from each of the pipelines.
+ const nonOptimizedOutput = getAllChangeStreamEvents(nonOptimizedPipeline, csConfig);
+ const optimizedOutput = getAllChangeStreamEvents(optimizedPipeline, csConfig);
+
+ // We never expect to see more optimized results than unoptimized.
+ assert(optimizedOutput.length <= nonOptimizedOutput.length,
+ {optimizedOutput: optimizedOutput, nonOptimizedOutput: nonOptimizedOutput});
+
+ // Check the unoptimized results against the optimized results. If we observe an entry
+ // in the non-optimized array that is not present in the optimized, add the details to
+ // 'failedTestCases' and continue.
+ for (let i = 0; i < nonOptimizedOutput.length; ++i) {
+ try {
+ assert(i < optimizedOutput.length);
+ assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i]));
+ } catch (error) {
+ failedTestCases.push({
+ matchExpr: matchExpr,
+ csConfig: csConfig,
+ events: {nonOptimized: nonOptimizedOutput[i], optimized: optimizedOutput[i]}
+ });
+ jsTestLog(`Total failures: ${failedTestCases.length}`);
+ break;
+ }
+ }
+ }
+ }
+}
+
+// Assert that there were no failed test cases.
+assert(failedTestCases.length == 0, failedTestCases);
+})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index 3dee9c53520..9e1cd0d308e 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -49,6 +49,32 @@ using AggExpressionRewrite =
namespace {
/**
+ * Helpers to clone an expression to the same type and rename the fields to which it applies.
+ */
+std::unique_ptr<PathMatchExpression> cloneWithSubstitution(
+ const PathMatchExpression* predicate, const StringMap<std::string>& renameList) {
+ auto clonedPred = std::unique_ptr<PathMatchExpression>(
+ static_cast<PathMatchExpression*>(predicate->shallowClone().release()));
+ clonedPred->applyRename(renameList);
+ return clonedPred;
+}
+boost::intrusive_ptr<ExpressionFieldPath> cloneWithSubstitution(
+ const ExpressionFieldPath* expr, const StringMap<std::string>& renameList) {
+ return static_cast<ExpressionFieldPath*>(expr->copyWithSubstitution(renameList).release());
+}
+
+/**
+ * Helper to resolve a predicate on a non-existent field to either AlwaysTrue or AlwaysFalse.
+ */
+std::unique_ptr<MatchExpression> resolvePredicateOnNonExistentField(
+ const PathMatchExpression* predicate) {
+ if (predicate->matchesSingleElement({})) {
+ return std::make_unique<AlwaysTrueMatchExpression>();
+ }
+ return std::make_unique<AlwaysFalseMatchExpression>();
+}
+
+/**
* Rewrites filters on 'operationType' in a format that can be applied directly to the oplog.
* Returns nullptr if the predicate cannot be rewritten.
*
@@ -67,9 +93,9 @@ std::unique_ptr<MatchExpression> matchRewriteOperationType(
str::stream() << "Unexpected predicate on " << predicate->path(),
predicate->fieldRef()->getPart(0) == DocumentSourceChangeStream::kOperationTypeField);
- // If the query is on a subfield of operationType, it will never match.
+ // If the query is on a subfield of operationType, it will always be missing.
if (predicate->fieldRef()->numParts() > 1) {
- return std::make_unique<AlwaysFalseMatchExpression>();
+ return resolvePredicateOnNonExistentField(predicate);
}
static const auto kExistsTrue = Document{{"$exists", true}};
@@ -223,8 +249,7 @@ std::unique_ptr<MatchExpression> matchRewriteDocumentKey(
// Helper to generate a filter on the 'op' field for the specified type. This filter will also
// include a copy of 'predicate' with the path renamed to apply to the oplog.
auto generateFilterForOp = [&](StringData op, const StringMap<std::string>& renameList) {
- auto renamedPredicate = predicate->shallowClone();
- static_cast<PathMatchExpression*>(renamedPredicate.get())->applyRename(renameList);
+ auto renamedPredicate = cloneWithSubstitution(predicate, renameList);
auto andExpr = std::make_unique<AndMatchExpression>();
andExpr->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value(op)));
@@ -235,13 +260,31 @@ std::unique_ptr<MatchExpression> matchRewriteDocumentKey(
// The MatchExpression which will contain the final rewritten predicate.
auto rewrittenPredicate = std::make_unique<OrMatchExpression>();
+ // Handle the case of non-CRUD events. The 'documentKey' field never exists for such events, so
+ // we evaluate the predicate against a non-existent field to see whether it matches.
+ if (predicate->matchesSingleElement({})) {
+ auto nonCRUDCase = MatchExpressionParser::parseAndNormalize(
+ fromjson("{$nor: [{op: 'i'}, {op: 'u'}, {op: 'd'}]}"), expCtx);
+ rewrittenPredicate->add(std::move(nonCRUDCase));
+ }
+
// Handle update, replace and delete. The predicate path can simply be renamed.
rewrittenPredicate->add(generateFilterForOp("u"_sd, {{"documentKey", "o2"}}));
rewrittenPredicate->add(generateFilterForOp("d"_sd, {{"documentKey", "o"}}));
- // If the path is a subfield of 'documentKey', inserts can also be handled by renaming.
+ // If the path is a subfield of 'documentKey', inserts can also be handled by renaming, as long
+ // as the path starts with _id or the predicate does not match a missing field.
if (predicate->fieldRef()->numParts() > 1) {
- rewrittenPredicate->add(generateFilterForOp("i"_sd, {{"documentKey", "o"}}));
+ // If the predicate matches against a missing field and is on a field which exists in the
+ // full document but not the documentKey, then applying that predicate to the 'o' field in
+ // the oplog will discard entries that would have matched the eventual change stream event.
+ if (pathStartsWithDKId || !predicate->matchesSingleElement({})) {
+ rewrittenPredicate->add(generateFilterForOp("i"_sd, {{"documentKey", "o"}}));
+ } else {
+ // We can't rewrite the predicate, so we have to match all {op: "i"} events.
+ rewrittenPredicate->add(
+ std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd)));
+ }
return rewrittenPredicate;
}
@@ -340,18 +383,16 @@ boost::intrusive_ptr<Expression> exprRewriteDocumentKey(
std::vector<BSONObj> opCases;
// Cases for 'insert' and 'delete'.
- auto insertAndDeletePath =
- static_cast<ExpressionFieldPath*>(expr->copyWithSubstitution({{"documentKey", "o"}}).get())
- ->getFieldPathWithoutCurrentPrefix()
- .fullPathWithPrefix();
+ auto insertAndDeletePath = cloneWithSubstitution(expr, {{"documentKey", "o"}})
+ ->getFieldPathWithoutCurrentPrefix()
+ .fullPathWithPrefix();
opCases.push_back(
fromjson("{case: {$in: ['$op', ['i', 'd']]}, then: '" + insertAndDeletePath + "'}"));
// Cases for 'update' and 'replace'.
- auto updateAndReplacePath =
- static_cast<ExpressionFieldPath*>(expr->copyWithSubstitution({{"documentKey", "o2"}}).get())
- ->getFieldPathWithoutCurrentPrefix()
- .fullPathWithPrefix();
+ auto updateAndReplacePath = cloneWithSubstitution(expr, {{"documentKey", "o2"}})
+ ->getFieldPathWithoutCurrentPrefix()
+ .fullPathWithPrefix();
opCases.push_back(
fromjson("{case: {$eq: ['$op', 'u']}, then: '" + updateAndReplacePath + "'}"));
@@ -397,7 +438,7 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument(
// {$or: [
// {$and: [{op: 'u'}, {'o._id': {$exists: false}}]},
// {$and: [
- // {$or: [{op: 'i'}, {op: 'u'}]},
+ // {$or: [{op: 'i'}, {op: 'u', 'o._id': {$exists: true}}]},
// {o: <predicate>}
// ]},
// // The following predicates are only present if the predicate matches a missing field
@@ -418,14 +459,11 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument(
// cases, because the full document is present in the oplog.
auto insertOrReplaceCase = std::make_unique<AndMatchExpression>();
- auto insertOrReplaceOpFilter = std::make_unique<OrMatchExpression>();
- insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd)));
- insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
+ auto insertOrReplaceOpFilter = MatchExpressionParser::parseAndNormalize(
+ fromjson("{$or: [{op: 'i'}, {op: 'u', 'o._id': {$exists: true}}]}"), expCtx);
insertOrReplaceCase->add(std::move(insertOrReplaceOpFilter));
- auto predForInsertOrReplace = predicate->shallowClone();
- static_cast<PathMatchExpression*>(predForInsertOrReplace.get())
- ->applyRename({{"fullDocument", "o"}});
+ auto predForInsertOrReplace = cloneWithSubstitution(predicate, {{"fullDocument", "o"}});
insertOrReplaceCase->add(std::move(predForInsertOrReplace));
rewrittenPredicate->add(std::move(insertOrReplaceCase));
@@ -458,136 +496,193 @@ std::unique_ptr<MatchExpression> matchRewriteUpdateDescription(
predicate->fieldRef()->getPart(0) ==
DocumentSourceChangeStream::kUpdateDescriptionField);
- // Check that this is a non-replacement update, i.e. {op: "u", "o._id": {$exists: false}}.
- auto rewrittenPredicate = std::make_unique<AndMatchExpression>();
- rewrittenPredicate->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
- rewrittenPredicate->add(
- std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd)));
-
- // For predicates on a non-dotted subfield of 'updateDescription.updatedFields' we can generate
- // a rewritten predicate that matches exactly like so:
- //
- // {updateDescription.updatedFields.<fieldName>: <pred>}
- // =>
- // {$and: [
- // {op: "u"},
- // {"o._id": {$exists: false}},
- // {$or: [
- // {o.diff.i.<fieldName>: <pred>},
- // {o.diff.u.<fieldName>: <pred>},
- // {o.$set.<fieldName>: <pred>}
- // ]}
- // ]}
- if (predicate->fieldRef()->numParts() == 3 &&
- predicate->fieldRef()->getPart(1) == "updatedFields"_sd) {
- // The oplog field corresponding to "updateDescription.updatedFields" can be in any one of
- // three locations. Construct an $or filter to match against them all.
- static const std::vector<std::string> oplogFields = {"o.diff.i", "o.diff.u", "o.$set"};
- auto updatedFieldsOr = std::make_unique<OrMatchExpression>();
- for (auto&& oplogField : oplogFields) {
- auto updateRewrite = predicate->shallowClone();
- static_cast<PathMatchExpression*>(updateRewrite.get())
- ->applyRename({{"updateDescription.updatedFields", oplogField}});
- updatedFieldsOr->add(std::move(updateRewrite));
- }
- // Add the $or into the final rewritten predicate and return.
- rewrittenPredicate->add(std::move(updatedFieldsOr));
- return rewrittenPredicate;
- }
-
- // For $eq predicates and $in predicates on 'updateDescription.removedFields' we can generate
- // a rewritten predicate that matches exactly like so:
- //
- // {updateDescription.removedFields: {$eq: <fieldName>}}
- // =>
- // {$and: [
- // {op: "u"},
- // {"o._id": {$exists: false}},
- // {$or: [
- // {o.diff.d.<fieldName>: {$exists: true}},
- // {o.$unset.<fieldName>: {$exists: true}}
- // ]}
- // ]}
- //
- // {updateDescription.removedFields: {$in: [<fieldName1>, <fieldName2>, ..]}}
- // =>
- // {$and: [
- // {op: "u"},
- // {"o._id": {$exists: false}},
- // {$or: [
- // {o.diff.d.<fieldName1>: {$exists: true}},
- // {o.$unset.<fieldName1>: {$exists: true}},
- // {o.diff.d.<fieldName2>: {$exists: true}},
- // {o.$unset.<fieldName2>: {$exists: true}},
- // ..
- // ]}
- // ]}
- if (predicate->fieldRef()->numParts() == 2 &&
- predicate->fieldRef()->getPart(1) == "removedFields"_sd) {
- // Helper to rewrite an equality on "updateDescription.removedFields" into the oplog.
- auto rewriteEqualityForOplog = [](auto& rhsElem) -> std::unique_ptr<MatchExpression> {
- // We can only rewrite equality matches on strings.
- if (rhsElem.type() != BSONType::String) {
- return nullptr;
+ // We can't determine whether we can perform a strict rewrite until we examine the predicate. We
+ // wrap this in a helper function that we can call while building the filter. We try to rewrite
+ // the predicate assuming that it will only be applied to non-replacement update oplog events.
+ auto tryExactRewriteForUpdateEvents = [](auto predicate) -> std::unique_ptr<MatchExpression> {
+ // $exists and null-equality checks on 'updateDescription' or its immediate subfields are
+ // AlwaysTrue or AlwaysFalse, since these fields are always present in the update event.
+ static const std::set<std::string> existentFields = {"updateDescription",
+ "updateDescription.updatedFields",
+ "updateDescription.removedFields",
+ "updateDescription.truncatedArrays"};
+ if (existentFields.count(predicate->path().toString())) {
+ // An {$exists:true} predicate will always match against any of these fields.
+ if (predicate->matchType() == MatchExpression::EXISTS) {
+ return std::make_unique<AlwaysTrueMatchExpression>();
}
- // We can only rewrite top-level fields, i.e. no dotted subpaths.
- auto fieldName = rhsElem.str();
- if (FieldRef(fieldName).numParts() > 1) {
- return nullptr;
+ // We check whether this is a ComparisonMatchExpression to ensure that the predicate is
+ // type-bracketed, which means that it will *only* match missing, null, or undefined.
+ // None of these fields will ever be null or undefined in the change stream event.
+ if (ComparisonMatchExpression::isComparisonMatchExpression(predicate) &&
+ predicate->matchesSingleElement({})) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
}
- // The oplog field corresponding to "updateDescription.removedFields" can be in either
- // of two locations. Construct an $or filter to match against them both.
- static const std::vector<std::string> oplogFields = {"o.diff.d", "o.$unset"};
- auto removedFieldsOr = std::make_unique<OrMatchExpression>();
+ }
+
+ // For predicates on a non-dotted subfield of 'updateDescription.updatedFields' we can
+ // generate a rewritten predicate that matches exactly like so:
+ //
+ // {updateDescription.updatedFields.<fieldName>: <pred>}
+ // =>
+ // {$and: [
+ // {op: "u"},
+ // {"o._id": {$exists: false}},
+ // {$or: [
+ // {o.diff.i.<fieldName>: <pred>},
+ // {o.diff.u.<fieldName>: <pred>},
+ // {o.$set.<fieldName>: <pred>}
+ // ]}
+ // ]}
+ if (predicate->fieldRef()->numParts() == 3 &&
+ predicate->fieldRef()->getPart(1) == "updatedFields"_sd) {
+ // The oplog field corresponding to "updateDescription.updatedFields" can be in any one
+ // of three locations. We will attempt to construct a filter to match against them all.
+ static const std::vector<std::string> oplogFields = {"o.diff.i", "o.diff.u", "o.$set"};
+ // If this predicate matches against a missing field, then we must apply an $and to all
+ // three potential locations, since at least two of them will always be missing. If not,
+ // then we build an $or to match if the field is present at any of the locations.
+ auto rewrittenUserPredicate = [predicate]() -> std::unique_ptr<ListOfMatchExpression> {
+ if (predicate->matchesSingleElement({})) {
+ return std::make_unique<AndMatchExpression>();
+ }
+ return std::make_unique<OrMatchExpression>();
+ }();
+ // Rewrite the predicate for each of the three potential oplog locations.
for (auto&& oplogField : oplogFields) {
- removedFieldsOr->add(
- std::make_unique<ExistsMatchExpression>(oplogField + "." + fieldName));
+ rewrittenUserPredicate->add(cloneWithSubstitution(
+ predicate, {{"updateDescription.updatedFields", oplogField}}));
}
- return removedFieldsOr;
- };
-
- // We can only match against a limited number of predicates here, $eq and $in.
- switch (predicate->matchType()) {
- case MatchExpression::EQ: {
- // Try to rewrite the predicate on "updateDescription.removedFields".
- auto eqME = static_cast<const EqualityMatchExpression*>(predicate);
- if (auto removedRewrite = rewriteEqualityForOplog(eqME->getData())) {
- rewrittenPredicate->add(std::move(removedRewrite));
- return rewrittenPredicate;
+ // Return the final rewritten predicate.
+ return rewrittenUserPredicate;
+ }
+
+ // For $eq predicates and $in predicates on 'updateDescription.removedFields' we can
+ // generate a rewritten predicate that matches exactly like so:
+ //
+ // {updateDescription.removedFields: {$eq: <fieldName>}}
+ // =>
+ // {$and: [
+ // {op: "u"},
+ // {"o._id": {$exists: false}},
+ // {$or: [
+ // {o.diff.d.<fieldName>: {$exists: true}},
+ // {o.$unset.<fieldName>: {$exists: true}}
+ // ]}
+ // ]}
+ //
+ // {updateDescription.removedFields: {$in: [<fieldName1>, <fieldName2>, ..]}}
+ // =>
+ // {$and: [
+ // {op: "u"},
+ // {"o._id": {$exists: false}},
+ // {$or: [
+ // {o.diff.d.<fieldName1>: {$exists: true}},
+ // {o.$unset.<fieldName1>: {$exists: true}},
+ // {o.diff.d.<fieldName2>: {$exists: true}},
+ // {o.$unset.<fieldName2>: {$exists: true}},
+ // ..
+ // ]}
+ // ]}
+ if (predicate->fieldRef()->numParts() == 2 &&
+ predicate->fieldRef()->getPart(1) == "removedFields"_sd) {
+ // Helper to rewrite an equality on "updateDescription.removedFields" into the oplog.
+ auto rewriteEqOnRemovedFields = [](auto& rhsElem) -> std::unique_ptr<MatchExpression> {
+ // We can only rewrite equality matches on strings.
+ if (rhsElem.type() != BSONType::String) {
+ return nullptr;
}
- break;
- }
- case MatchExpression::MATCH_IN: {
- // If this $in includes any regexes, we can't proceed with the rewrite.
- auto inME = static_cast<const InMatchExpression*>(predicate);
- if (!inME->getRegexes().empty()) {
+ // We can only rewrite top-level fields, i.e. no dotted subpaths.
+ auto fieldName = rhsElem.str();
+ if (FieldRef(fieldName).numParts() > 1) {
return nullptr;
}
- // An empty '$in' should never match anything.
- if (inME->getEqualities().empty()) {
- return std::make_unique<AlwaysFalseMatchExpression>();
+ // The oplog field corresponding to "updateDescription.removedFields" can be in
+ // either of two locations. Construct an $or filter to match against them both.
+ // Because we have already validated that this is an equality string match, we do
+ // not need to check whether the predicate matches a missing field in this case.
+ static const std::vector<std::string> oplogFields = {"o.diff.d", "o.$unset"};
+ auto rewrittenEquality = std::make_unique<OrMatchExpression>();
+ for (auto&& oplogField : oplogFields) {
+ rewrittenEquality->add(
+ std::make_unique<ExistsMatchExpression>(oplogField + "." + fieldName));
}
- // Try to rewrite the $in as an $or of equalities on the oplog. If any individual
- // rewrite fails, we must abandon the entire rewrite.
- auto inRemovedOr = std::make_unique<OrMatchExpression>();
- for (const auto& rhsElem : inME->getEqualities()) {
- if (auto removedRewrite = rewriteEqualityForOplog(rhsElem)) {
- inRemovedOr->add(std::move(removedRewrite));
- } else {
+ return rewrittenEquality;
+ };
+
+ // We can only match against a limited number of predicates here, $eq and $in.
+ switch (predicate->matchType()) {
+ case MatchExpression::EQ: {
+ // Try to rewrite the predicate on "updateDescription.removedFields".
+ auto eqME = static_cast<const EqualityMatchExpression*>(predicate);
+ return rewriteEqOnRemovedFields(eqME->getData());
+ }
+ case MatchExpression::MATCH_IN: {
+ // If this $in includes any regexes, we can't proceed with the rewrite.
+ auto inME = static_cast<const InMatchExpression*>(predicate);
+ if (!inME->getRegexes().empty()) {
return nullptr;
}
+ // An empty '$in' should never match anything.
+ if (inME->getEqualities().empty()) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+ // Try to rewrite the $in as an $or of equalities on the oplog. If any
+ // individual rewrite fails, we must abandon the entire rewrite.
+ auto rewrittenUserPredicate = std::make_unique<OrMatchExpression>();
+ for (const auto& rhsElem : inME->getEqualities()) {
+ if (auto rewrittenEquality = rewriteEqOnRemovedFields(rhsElem)) {
+ rewrittenUserPredicate->add(std::move(rewrittenEquality));
+ } else {
+ return nullptr;
+ }
+ }
+ // Return the final rewritten predicate.
+ return rewrittenUserPredicate;
}
- // Add the rewritten $in to the final rewritten predicate and return.
- rewrittenPredicate->add(std::move(inRemovedOr));
- return rewrittenPredicate;
+ default:
+ break;
}
- default:
- break;
}
+ // If we reach here, we cannot rewrite this predicate.
+ return nullptr;
+ };
+
+ // Try to rewrite the user predicate. If we can't, then we may not be able to continue.
+ auto rewrittenUserPredicate = tryExactRewriteForUpdateEvents(predicate);
+
+ // If a strict rewrite is required and we could not rewrite the predicate, return nullptr. We
+ // also return nullptr if the predicate matches a missing field, since it is pointless to try
+ // to continue; we would have to return all updates, because we don't know whether they will
+ // match, and all non-updates, because they will always match.
+ if (!rewrittenUserPredicate && (!allowInexact || predicate->matchesSingleElement({}))) {
+ return nullptr;
}
- // If we reach here, we cannot perform a rewrite.
- return nullptr;
+ // If we are here, then either we were able to rewrite the predicate, or we were not but an
+ // inexact rewrite is permissible. First write a predicate to check that this is an update
+ // that is not a full-document replacement, i.e. {op: "u", "o._id": {$exists: false}}.
+ std::unique_ptr<ListOfMatchExpression> finalPredicate = std::make_unique<AndMatchExpression>();
+ finalPredicate->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd)));
+ finalPredicate->add(
+ std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd)));
+
+ // If we were able to rewrite the user predicate, add it into the final predicate.
+ if (rewrittenUserPredicate) {
+ finalPredicate->add(std::move(rewrittenUserPredicate));
+ }
+
+ // Handle the case of non-update events. The 'updateDescription' field never exists for these
+ // events, so we evaluate the predicate against a non-existent field to see whether it matches.
+ if (predicate->matchesSingleElement({})) {
+ auto nonUpdateCase = MatchExpressionParser::parseAndNormalize(
+ fromjson("{$or: [{op: {$ne: 'u'}}, {op: 'u', 'o._id': {$exists: true}}]}"), expCtx);
+ finalPredicate = std::make_unique<OrMatchExpression>(std::move(finalPredicate), nullptr);
+ finalPredicate->add(std::move(nonUpdateCase));
+ }
+
+ // Finally, we return the complete rewritten predicate.
+ return finalPredicate;
}
// Helper to rewrite predicates on any change stream namespace field of the form {db: "dbName",
@@ -1222,7 +1317,7 @@ boost::intrusive_ptr<Expression> rewriteAggExpressionTree(
// Some paths can be rewritten just by renaming the path.
if (renameRegistry.contains(firstPath)) {
- return fieldExpr->copyWithSubstitution(renameRegistry).release();
+ return cloneWithSubstitution(fieldExpr, renameRegistry);
}
// Other paths have custom rewrite logic.
@@ -1363,9 +1458,7 @@ std::unique_ptr<MatchExpression> rewriteMatchExpressionTree(
// Some paths can be rewritten just by renaming the path.
if (renameRegistry.contains(firstPath)) {
- auto renamedME = pathME->shallowClone();
- static_cast<PathMatchExpression*>(renamedME.get())->applyRename(renameRegistry);
- return renamedME;
+ return cloneWithSubstitution(pathME, renameRegistry);
}
// Other paths have custom rewrite logic.
@@ -1386,27 +1479,33 @@ std::unique_ptr<MatchExpression> rewriteMatchExpressionTree(
std::unique_ptr<MatchExpression> rewriteFilterForFields(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,
- std::set<std::string> fields) {
+ std::set<std::string> includeFields,
+ std::set<std::string> excludeFields) {
// If we get null in, we return null immediately.
if (!userMatch) {
return nullptr;
}
// If the specified 'fields' set is empty, we rewrite every possible field.
- if (fields.empty()) {
+ if (includeFields.empty()) {
for (auto& rename : renameRegistry) {
- fields.insert(rename.first);
+ includeFields.insert(rename.first);
}
for (auto& meRewrite : matchRewriteRegistry) {
- fields.insert(meRewrite.first);
+ includeFields.insert(meRewrite.first);
}
for (auto& exprRewrite : exprRewriteRegistry) {
- fields.insert(exprRewrite.first);
+ includeFields.insert(exprRewrite.first);
}
}
+ // Remove any fields which are present in the "excludeFields" list.
+ for (auto&& excludeField : excludeFields) {
+ includeFields.erase(excludeField);
+ }
+
// Attempt to rewrite the tree. Predicates on unknown or unrequested fields will be discarded.
- return rewriteMatchExpressionTree(expCtx, userMatch, fields, true /* allowInexact */);
+ return rewriteMatchExpressionTree(expCtx, userMatch, includeFields, true /* allowInexact */);
}
} // namespace change_stream_rewrite
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.h b/src/mongo/db/pipeline/change_stream_rewrite_helpers.h
index 6ba6c67cc0f..9df83fde24f 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.h
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.h
@@ -40,13 +40,14 @@ namespace change_stream_rewrite {
* 'userMatch' filter, reducing the amount of work that subsequent change streams stage will have to
* do.
*
- * The rewrite splits out only the part of 'userMatch' that just needs those fields that are
- * indicated in the 'fields' set. When 'fields' is the empty set, the rewrite includes all paths
- * that can be rewritten.
+ * The rewrites will only be performed on fields which are present in the 'includeFields' set and
+ * absent from the 'excludeFields' set. When 'includeFields' is the empty set, the rewrite defaults
+ * to including all paths that can be rewritten.
*/
std::unique_ptr<MatchExpression> rewriteFilterForFields(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MatchExpression* userMatch,
- std::set<std::string> fields = {});
+ std::set<std::string> includeFields = {},
+ std::set<std::string> excludeFields = {});
} // namespace change_stream_rewrite
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5dcb4550ba3..c2cbe9ea701 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -3928,6 +3928,9 @@ public:
}
};
+//
+// Logical rewrites
+//
TEST_F(ChangeStreamRewriteTest, RewriteOrPredicateOnRenameableFields) {
auto spec = fromjson("{$or: [{clusterTime: {$type: [17]}}, {lsid: {$type: [16]}}]}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4045,57 +4048,6 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteExprWhenAllFieldsAreRenameable) {
"{$isNumber: ['$lsid']}]}}"));
}
-TEST_F(ChangeStreamRewriteTest, CanRewriteExprWithOperationType) {
- auto spec = fromjson("{$expr: {$eq: ['$operationType', 'insert']}}");
- auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
- ASSERT_OK(statusWithMatchExpression.getStatus());
-
- auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
- getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
-
- auto rewrittenPredicate = rewrittenMatchExpression->serialize();
-
- const string expectedRewrite = R"(
-{
- $expr: {
- $eq: [
- {
- $switch: {
- branches: [
- {case: {$eq: ['$op', {$const: 'i'}]}, then: {$const: 'insert'}},
- {
- case: {
- $and:
- [{$eq: ['$op', {$const: 'u'}]}, {$eq: ['$o._id', '$$REMOVE']}]
- },
- then: {$const: 'update'}
- },
- {
- case: {
- $and:
- [{$eq: ['$op', {$const: 'u'}]}, {$ne: ['$o._id', '$$REMOVE']}]
- },
- then: {$const: 'replace'}
- },
- {case: {$eq: ['$op', {$const: 'd'}]}, then: {$const: 'delete'}},
- {case: {$ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'},
- {case: {$ne: ['$o.drop', '$$REMOVE']}, then: {$const: 'drop'}},
- {
- case: {$ne: ['$o.dropDatabase', '$$REMOVE']},
- then: {$const: 'dropDatabase'}
- },
- {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}}
- ],
- default: '$$REMOVE'
- }
- },
- {$const: 'insert'}
- ]
- }
-})";
- ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson(expectedRewrite));
-}
-
TEST_F(ChangeStreamRewriteTest, CanInexactlyRewriteExprAndWithUnrewritableChild) {
// Note that rewrite of "$$ROOT" without a path is unsupported.
auto spec = fromjson("{$expr: {$and: [{$isNumber: '$lsid'}, {$isArray: '$$ROOT'}]}}");
@@ -4251,6 +4203,9 @@ TEST_F(ChangeStreamRewriteTest, DoesNotRewriteUnrequestedFieldInExpr) {
ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$expr: {$and: [{$isNumber: ['$ts']}]}}"));
}
+//
+// 'operationType' rewrites
+//
TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeWithInvalidOperandType) {
auto statusWithMatchExpression =
MatchExpressionParser::parse(BSON("operationType" << 1), getExpCtx());
@@ -4275,6 +4230,19 @@ TEST_F(ChangeStreamRewriteTest, CannotRewriteEqPredicateOnOperationTypeWithUnkno
ASSERT_FALSE(rewrittenMatchExpression);
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationType) {
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("operationType" << BSONNULL), getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeInsert) {
auto spec = fromjson("{operationType: 'insert'}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4368,6 +4336,19 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeSubField) {
ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationTypeSubField) {
+ auto spec = fromjson("{'operationType.subField': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysTrue: 1}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) {
auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("drop"
<< "insert")));
@@ -4463,6 +4444,60 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNinPredicateOnOperationType) {
fromjson("{op: {$eq: 'i'}}"))))));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteExprWithOperationType) {
+ auto spec = fromjson("{$expr: {$eq: ['$operationType', 'insert']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"});
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+
+ const string expectedRewrite = R"(
+{
+ $expr: {
+ $eq: [
+ {
+ $switch: {
+ branches: [
+ {case: {$eq: ['$op', {$const: 'i'}]}, then: {$const: 'insert'}},
+ {
+ case: {
+ $and:
+ [{$eq: ['$op', {$const: 'u'}]}, {$eq: ['$o._id', '$$REMOVE']}]
+ },
+ then: {$const: 'update'}
+ },
+ {
+ case: {
+ $and:
+ [{$eq: ['$op', {$const: 'u'}]}, {$ne: ['$o._id', '$$REMOVE']}]
+ },
+ then: {$const: 'replace'}
+ },
+ {case: {$eq: ['$op', {$const: 'd'}]}, then: {$const: 'delete'}},
+ {case: {$ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'},
+ {case: {$ne: ['$o.drop', '$$REMOVE']}, then: {$const: 'drop'}},
+ {
+ case: {$ne: ['$o.dropDatabase', '$$REMOVE']},
+ then: {$const: 'dropDatabase'}
+ },
+ {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}}
+ ],
+ default: '$$REMOVE'
+ }
+ },
+ {$const: 'insert'}
+ ]
+ }
+})";
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson(expectedRewrite));
+}
+
+//
+// 'documentKey' rewrites
+//
TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnFieldDocumentKey) {
auto spec = fromjson("{documentKey: {_id: 'bar', foo: 'baz'}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4493,6 +4528,38 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnFieldDocumentKey) {
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldDocumentKey) {
+ auto spec = fromjson("{documentKey: {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"documentKey"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$nor: ["
+ " {op: {$eq: 'i'}},"
+ " {op: {$eq: 'u'}},"
+ " {op: {$eq: 'd'}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {o2: {$eq: null}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'd'}},"
+ " {o: {$eq: null}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'i'}},"
+ " {$alwaysFalse: 1}"
+ " ]}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnFieldDocumentKey) {
auto spec = fromjson("{documentKey: {$in: [{}, {_id: 'bar'}, {_id: 'bar', foo: 'baz'}]}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4570,6 +4637,38 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldDocumentKeyId
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldDocumentKeyId) {
+ auto spec = fromjson("{'documentKey._id': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"documentKey"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$nor: ["
+ " {op: {$eq: 'i'}},"
+ " {op: {$eq: 'u'}},"
+ " {op: {$eq: 'd'}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o2._id': {$eq: null}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'd'}},"
+ " {'o._id': {$eq: null}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'i'}},"
+ " {'o._id': {$eq: null}}"
+ " ]}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CanExactlyRewriteExprPredicateOnFieldDocumentKeyId) {
auto spec = fromjson("{$expr: {$lt: ['$documentKey._id', 'bar']}}");
@@ -4624,6 +4723,35 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldDocumentKeyFo
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldDocumentKeyFoo) {
+ auto spec = fromjson("{'documentKey.foo': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"documentKey"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$nor: ["
+ " {op: {$eq: 'i'}},"
+ " {op: {$eq: 'u'}},"
+ " {op: {$eq: 'd'}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o2.foo': {$eq: null}}"
+ " ]},"
+ " {$and: ["
+ " {op: {$eq: 'd'}},"
+ " {'o.foo': {$eq: null}}"
+ " ]},"
+ " {op: {$eq: 'i'}}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldDocumentKey) {
auto spec = fromjson("{documentKey: {$not: {$eq: {_id: 'bar'}}}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4721,6 +4849,9 @@ TEST_F(ChangeStreamRewriteTest, CannotExactlyRewriteExprPredicateOnFieldDocument
ASSERT(rewrittenMatchExpression == nullptr);
}
+//
+// 'fullDocument' rewrites
+//
TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldFullDocumentFoo) {
auto spec = fromjson("{'fullDocument.foo': {$lt: 'bar'}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx());
@@ -4739,8 +4870,11 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldFullDocumentF
" ]},"
" {$and: ["
" {$or: ["
- " {op: {$eq: 'i'}},"
- " {op: {$eq: 'u'}}"
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$exists: true}}"
+ " ]},"
+ " {op: {$eq: 'i'}}"
" ]},"
" {'o.foo': {$lt: 'bar'}}"
" ]}"
@@ -4769,8 +4903,11 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNullComparisonPredicateOnFieldFullDocu
" ]},"
" {$and: ["
" {$or: ["
- " {op: {$eq: 'i'}},"
- " {op: {$eq: 'u'}}"
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$exists: true}}"
+ " ]},"
+ " {op: {$eq: 'i'}}"
" ]},"
" {'o.foo': {$eq: null}}"
" ]},"
@@ -4793,6 +4930,9 @@ TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldFullDocument
ASSERT(rewrittenMatchExpression == nullptr);
}
+//
+// 'ns' rewrites
+//
TEST_F(ChangeStreamRewriteTest, CanRewriteFullNamespaceObject) {
auto expCtx = getExpCtx();
auto statusWithMatchExpression = MatchExpressionParser::parse(
@@ -5782,6 +5922,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnInvalidFieldPath) {
fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}"));
}
+//
+// 'to' rewrites
+//
TEST_F(ChangeStreamRewriteTest, CanRewriteFullToObject) {
auto expCtx = getExpCtx();
auto statusWithMatchExpression = MatchExpressionParser::parse(
@@ -6496,7 +6639,10 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteToWithExprOnInvalidCollSubFieldPath) {
fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}"));
}
-TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescription) {
+//
+// 'updateDescription' rewrites
+//
+TEST_F(ChangeStreamRewriteTest, CanInexactlyRewritePredicateOnFieldUpdateDescription) {
auto expCtx = getExpCtx();
auto expr = fromjson(
"{updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: []}}");
@@ -6505,11 +6651,82 @@ TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescription)
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldUpdateDescription) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson(
+ "{updateDescription: {$ne: {updatedFields: {}, removedFields: [], truncatedArrays: []}}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
-TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionUpdatedFields) {
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescription) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{updateDescription: {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that this produces an {$alwaysFalse:1} predicate for update events. This will optimize
+ // away the enclosing $and so that only non-updates will be returned from the oplog scan.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysFalse: 1}"
+ " ]},"
+ " {$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$exists: true}}"
+ " ]},"
+ " {op: {$not: {$eq: 'u'}}}"
+ " ]}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteExistsPredicateOnFieldUpdateDescription) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{updateDescription: {$exists: true}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that the {$alwaysTrue:1} predicate is an artefact of the rewrite process. It will be
+ // optimized away and will have no functional impact on the filter.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysTrue: 1}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanInexactlyRewritePredicateOnFieldUpdateDescriptionUpdatedFields) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.updatedFields': {}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6517,10 +6734,81 @@ TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionUp
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewritePredicateOnFieldUpdateDescriptionUpdatedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields': {$ne: {}}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescriptionUpdatedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that this produces an {$alwaysFalse:1} predicate for update events. This will optimize
+ // away the enclosing $and so that only non-updates will be returned from the oplog scan.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysFalse: 1}"
+ " ]},"
+ " {$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$exists: true}}"
+ " ]},"
+ " {op: {$not: {$eq: 'u'}}}"
+ " ]}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteExistsPredicateOnFieldUpdateDescriptionUpdatedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields': {$exists: true}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that the {$alwaysTrue:1} predicate is an artefact of the rewrite process. It will be
+ // optimized away and will have no functional impact on the filter.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysTrue: 1}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest,
CanRewriteArbitraryPredicateOnFieldUpdateDescriptionUpdatedFieldsFoo) {
auto expCtx = getExpCtx();
@@ -6545,8 +6833,41 @@ TEST_F(ChangeStreamRewriteTest,
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescriptionUpdatedFieldsFoo) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields.foo': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that we perform an $and of all three oplog locations for this rewrite.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$and: ["
+ " {'o.diff.i.foo': {$eq: null}},"
+ " {'o.diff.u.foo': {$eq: null}},"
+ " {'o.$set.foo': {$eq: null}}"
+ " ]}"
+ " ]},"
+ " {$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$exists: true}}"
+ " ]},"
+ " {op: {$not: {$eq: 'u'}}}"
+ " ]}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest,
- CannotRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) {
+ CanInexactlyRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': 'b'}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6554,6 +6875,39 @@ TEST_F(ChangeStreamRewriteTest,
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': {$ne: 'b'}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotRewriteEqNullPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
@@ -6580,8 +6934,59 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteStringEqPredicateOnFieldUpdateDescript
"]}"));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$eq: null}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that this produces an {$alwaysFalse:1} predicate for update events. This will optimize
+ // away the enclosing $and so that only non-updates will be returned from the oplog scan.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysFalse: 1}"
+ " ]},"
+ " {$or: ["
+ " {$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$exists: true}}"
+ " ]},"
+ " {op: {$not: {$eq: 'u'}}}"
+ " ]}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteExistsPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$exists: true}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // Note that the {$alwaysTrue:1} predicate is an artefact of the rewrite process. It will be
+ // optimized away and will have no functional impact on the filter.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysTrue: 1}"
+ "]}"));
+}
+
TEST_F(ChangeStreamRewriteTest,
- CannotRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ CanInexactlyRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.removedFields': 'u.v'}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6589,12 +6994,32 @@ TEST_F(ChangeStreamRewriteTest,
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$ne: 'u.v'}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
TEST_F(ChangeStreamRewriteTest,
- CannotRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ CanInexactlyRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.removedFields': ['z']}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6602,6 +7027,26 @@ TEST_F(ChangeStreamRewriteTest,
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$ne: ['z']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
@@ -6645,11 +7090,16 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEmptyInPredicateOnFieldUpdateDescripti
ASSERT(rewrittenMatchExpression);
auto rewrittenPredicate = rewrittenMatchExpression->serialize();
- ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}"));
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}},"
+ " {$alwaysFalse: 1}"
+ "]}"));
}
TEST_F(ChangeStreamRewriteTest,
- CannotRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ CanInexactlyRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.removedFields': {$in: ['w', ['y']]}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6657,12 +7107,32 @@ TEST_F(ChangeStreamRewriteTest,
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$nin: ['w', ['y']]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
TEST_F(ChangeStreamRewriteTest,
- CannotRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ CanInexactlyRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.removedFields': {$in: [/ab*c/, /de*f/]}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6670,12 +7140,32 @@ TEST_F(ChangeStreamRewriteTest,
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$nin: [/ab*c/, /de*f/]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
TEST_F(ChangeStreamRewriteTest,
- CannotRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) {
+ CanInexactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.removedFields': {$lt: 'z'}}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6683,11 +7173,65 @@ TEST_F(ChangeStreamRewriteTest,
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields': {$not: {$lt: 'z'}}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
-TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) {
+TEST_F(ChangeStreamRewriteTest,
+ CanInexactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFieldsFoo) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields.foo': {$lt: 'z'}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFieldsFoo) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.removedFields.foo': {$not: {$lt: 'z'}}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+
+ ASSERT(rewrittenMatchExpression == nullptr);
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CanInexactlyRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) {
auto expCtx = getExpCtx();
auto expr = fromjson("{'updateDescription.truncatedArrays': []}");
auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
@@ -6695,6 +7239,26 @@ TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionTr
auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
+ ASSERT(rewrittenMatchExpression);
+
+ // This produces a minimally selective filter which returns all non-replacement update events.
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$and: ["
+ " {op: {$eq: 'u'}},"
+ " {'o._id': {$not: {$exists: true}}}"
+ "]}"));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CannotExactlyRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'updateDescription.truncatedArrays': {$ne: []}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"});
ASSERT(rewrittenMatchExpression == nullptr);
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index d7f646c8d10..8e51e29e7b9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/pipeline/document_source_change_stream_unwind_transaction.h"
#include "mongo/db/pipeline/change_stream_filter_helpers.h"
+#include "mongo/db/pipeline/change_stream_rewrite_helpers.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/transaction_history_iterator.h"
@@ -385,9 +386,10 @@ void DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::
namespace change_stream_filter {
/**
- * Build a filter, similar to the optimized oplog filter, designed to reject oplog entries that we
- * know would eventually get rejected by the 'userMatch' filter if they continued through the rest
- * of the pipeline.
+ * Build a filter, similar to the optimized oplog filter, designed to reject individual transaction
+ * entries that we know would eventually get rejected by the 'userMatch' filter if they continued
+ * through the rest of the pipeline. We must also adjust the filter slightly for user rewrites, as
+ * events within a transaction do not have certain fields that are common to other oplog entries.
*
* NB: The new filter may contain references to strings in the BSONObj that 'userMatch' originated
* from. Callers that keep the new filter long-term should serialize and re-parse it to guard
@@ -395,11 +397,21 @@ namespace change_stream_filter {
*/
std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) {
- // The current implementation of the transaction filter is the same as the "operation filter"
- // applied to oplog entries that can be filtered early (CRUD operations and non-invalidating
- // commands). This filter includes a namespace filter, ensuring it will filter out all documents
- // that would be filtered out by the default 'ns' filter this stage gets initialized with.
- return change_stream_filter::buildOperationFilter(expCtx, userMatch);
+ // The transaction unwind filter is the same as the operation filter applied to the oplog. This
+ // includes a namespace filter, which ensures that it will discard all documents that would be
+ // filtered out by the default 'ns' filter this stage gets initialized with.
+ auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr));
+
+ // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do
+ // this separately because we need to exclude certain fields from the user's filters. Unwound
+ // transaction events do not have these fields until we populate them from the commitTransaction
+ // event. We already applied these predicates during the oplog scan, so we know that they match.
+ static const std::set<std::string> excludedFields = {"clusterTime", "lsid", "txnNumber"};
+ if (auto rewrittenMatch =
+ change_stream_rewrite::rewriteFilterForFields(expCtx, userMatch, {}, excludedFields)) {
+ unwindFilter->add(std::move(rewrittenMatch));
+ }
+ return MatchExpression::optimize(std::move(unwindFilter));
}
} // namespace change_stream_filter
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index cf23b7735b6..79e39f865d9 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -61,7 +61,8 @@ public:
FacetRequirement::kAllowed,
TransactionRequirement::kAllowed,
LookupRequirement::kAllowed,
- UnionRequirement::kAllowed};
+ UnionRequirement::kAllowed,
+ ChangeStreamRequirement::kAllowlist};
}
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {