diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2022-01-07 23:23:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-08 11:06:37 +0000 |
commit | 0cf0d832fd76a93496e5a6df71b417a5059b0bd8 (patch) | |
tree | a7ad3e5d32b58fb2c3de390d23da4c2df330cf70 | |
parent | ccaed20db0dcff679c8ac208fe08c4fda24ffd83 (diff) | |
download | mongo-0cf0d832fd76a93496e5a6df71b417a5059b0bd8.tar.gz |
SERVER-62081 Exhaustively test and fix change stream rewrites against null-equality predicates
(cherry picked from commit 401fff7b836f0ad71d0e9e714a5801287dba8ce1)
9 files changed, 1242 insertions, 224 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js index 4a293da3290..a9a7bfc3455 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js @@ -135,6 +135,12 @@ for (const op of ["insert", "update", "replace", "delete"]) { [[op, 3, 0], [op, 3, 1]], [1, 1] /* expectedOplogCursorReturnedDocs */); + // Test out an {$eq: null} predicate on 'documentKey._id'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey._id": {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + // Test out a negated predicate on 'documentKey.shard'. It's not possible to rewrite this // predicate and make it part of the oplog filter, so we expect the oplog cursor to return 2 // docs on each shard. @@ -150,6 +156,13 @@ for (const op of ["insert", "update", "replace", "delete"]) { [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]], [2, 2] /* expectedOplogCursorReturnedDocs */); + // Test out the '{$eq: null}' predicate on a field that doesn't exist in 'documentKey' but that + // does exist in some of the underlying documents. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey.z": {$eq: null}}}, + [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + // Test out an $expr predicate on the full 'documentKey' field. verifyOps( resumeAfterToken, diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js index 61ce95e1e79..8f7262553de 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js @@ -141,6 +141,13 @@ verifyNonInvalidatingOps(resumeAfterToken, [] /* expectedOps */, 0 /* expectedOplogRetDocsForEachShard */); +// Ensure that the '$eq: null' on operation type sub-field can be rewritten to the oplog format. The +// oplog cursor should return all documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {"operationType.subField": {$eq: null}}}, + ["insert", "update", "replace", "delete"] /* expectedOps */, + 4 /* expectedOplogRetDocsForEachShard */); + // Ensure that the '$match' on the operation type with '$in' is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$in: ["insert", "update"]}}}, diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js index 13fbbad16b5..fcbd2f6b9c6 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js @@ -141,6 +141,26 @@ for (const reverseShards of [false, true]) { [[op, 2, 0]], [2, 2] /* expectedOplogCursorReturnedDocs */); + // Test out an $eq:null predicate on the full 'updateDescription' field. + verifyOps({$match: {operationType: op, updateDescription: {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on the full 'updateDescription' field. + verifyOps({$match: {operationType: op, updateDescription: {$exists: false}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out an $eq:null predicate on 'updateDescription.updatedFields'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on 'updateDescription.updatedFields'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$exists: false}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + // Test out an $eq predicate on 'updateDescription.updatedFields.f'. verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": "b"}}, [[op, 3, 0]], @@ -171,6 +191,16 @@ for (const reverseShards of [false, true]) { [[op, 3, 1]], [0, 1] /* expectedOplogCursorReturnedDocs */); + // Test out an $eq:null predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$exists: false}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + // Test out a non-dotted string $eq predicate on 'updateDescription.removedFields'. verifyOps({$match: {operationType: op, "updateDescription.removedFields": "z"}}, [[op, 2, 0], [op, 2, 1]], @@ -218,6 +248,11 @@ for (const reverseShards of [false, true]) { [[op, 2, 0], [op, 3, 0], [op, 3, 1]], [2, 1] /* expectedOplogCursorReturnedDocs */); + // Test out an {$eq:null} predicate on 'updateDescription.updatedFields.g'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": {$eq: null}}}, + [[op, 2, 0], [op, 3, 0], [op, 3, 1]], + [2, 1] /* expectedOplogCursorReturnedDocs */); + // Test out a negated $eq predicate on 'updateDescription.removedFields'. verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$not: {$eq: "z"}}}}, [[op, 3, 0], [op, 3, 1]], diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js new file mode 100644 index 00000000000..43df7cfd1bb --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js @@ -0,0 +1,286 @@ +/** + * Tests that change streams correctly handle rewrites of null, existence and equality checks, for + * both existent and non-existent fields and subfields. + * @tags: [ + * featureFlagChangeStreamsRewrite, + * requires_fcv_51, + * requires_pipeline_optimization, + * uses_change_streams + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. +load("jstests/libs/fixture_helpers.js"); // For isMongos. + +const dbName = "change_stream_rewrite_null_existence_test"; + +const collName = "coll1"; +const collNameAfterRename = "coll_renamed"; + +// Establish a resume token at a point before anything actually happens in the test. +const startPoint = db.getMongo().watch().getResumeToken(); + +// If this is a sharded passthrough, make sure we shard on something other than _id. +if (FixtureHelpers.isMongos(db)) { + assertDropCollection(db, collName); + assert.commandWorked(db.adminCommand({enableSharding: dbName})); + assert.commandWorked( + db.adminCommand({shardCollection: `${dbName}.${collName}`, key: {shardKey: "hashed"}})); +} + +const testDB = db.getSiblingDB(dbName); +let testColl = testDB[collName]; + +const numDocs = 8; + +// Generate the write workload. +(function performWriteWorkload() { + // Insert some documents. + for (let i = 0; i < numDocs; ++i) { + assert.commandWorked(testColl.insert( + {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false})); + } + + // Update half of them. We generate these updates individually so that they generate different + // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields. + const updateSpecs = [ + [{$set: {f2: true}}], // only populates 'updatedFields' + [{$unset: ["f1"]}], // only populates 'removedFields' + [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays' + [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields + ]; + for (let i = 0; i < numDocs / 2; ++i) { + assert.commandWorked( + testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)])); + } + + // Replace the other half. + for (let i = numDocs / 2; i < numDocs; ++i) { + assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i})); + } + + // Delete half of the updated documents. + for (let i = 0; i < numDocs / 4; ++i) { + assert.commandWorked(testColl.remove({_id: i, shardKey: i})); + } +})(); + +// Rename the collection. +assert.commandWorked(testColl.renameCollection(collNameAfterRename)); +testColl = testDB[collNameAfterRename]; + +// Drop the collection. +assert(testColl.drop()); + +// Drop the database. +assert.commandWorked(testDB.dropDatabase()); + +// Function to generate a list of all paths to be tested from those observed in the event stream. +function traverseEvent(event, outputMap, prefixPath = "") { + // The number of values to equality-test for each path. Set this to Infinity to test everything. + const maxValuesPerPath = 1; + + // Begin traversing through the event, adding paths and values into 'outputMap'. + for (let fieldName in event) { + const fieldPath = (prefixPath.length > 0 ? prefixPath + "." : "") + fieldName; + const fieldVal = event[fieldName]; + + // Create an entry for this field if it doesn't already exist. + if (!outputMap[fieldPath]) { + outputMap[fieldPath] = {extraValues: [], values: []}; + } + + // Add entries for each of the standard subfields that we test for every existent field. + for (let subField of standardSubFieldsToTest) { + const subFieldPathToAdd = fieldPath + "." + subField; + if (!outputMap[subFieldPathToAdd]) { + outputMap[subFieldPathToAdd] = {extraValues: [], values: []}; + } + } + + // Helper function to add a new value into the fields list. + function addToPredicatesList(fieldPath, fieldVal) { + const alreadyExists = + outputMap[fieldPath].values.some((elem) => friendlyEqual(elem, fieldVal)); + const numValues = outputMap[fieldPath].values.length; + if (!alreadyExists && numValues < maxValuesPerPath) { + outputMap[fieldPath].values.push(fieldVal); + } + } + + // Helper function to check whether this value is a plain old javascript object. + function isPlainObject(value) { + return (value && typeof (value) == "object" && value.constructor === Object); + } + + // Add a predicate on the full field, whether scalar, object, or array. + addToPredicatesList(fieldPath, fieldVal); + + // If the field is an object, traverse through it. + if (isPlainObject(fieldVal)) { + traverseEvent(fieldVal, outputMap, fieldPath); + } + + // If the field is an array, find any subobjects and traverse them. + if (Array.isArray(fieldVal)) { + for (let arrayElem of fieldVal) { + if (isPlainObject(arrayElem)) { + traverseEvent(arrayElem, outputMap, fieldPath); + } else { + addToPredicatesList(fieldPath, arrayElem); + } + } + // Traverse through the array itself as an object. This will descend into the array by + // index, allowing us to test fieldname-or-array-index matching semantics. + traverseEvent(fieldVal, outputMap, fieldPath); + } + } +} + +// Helper function to fully exhaust a change stream from the start point and return all events. +function getAllChangeStreamEvents(extraStages, csOptions) { + // Open a whole-cluster stream based on the supplied arguments. + const csCursor = testDB.getMongo().watch( + extraStages, Object.assign({resumeAfter: startPoint, maxAwaitTimeMS: 1}, csOptions)); + + // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will + // guarantee that all shards have returned results, and we expect all results to fit into a + // single batch, so we know we have exhausted the stream. + while (bsonWoCompare(csCursor._postBatchResumeToken, startPoint) == 0) { + csCursor.hasNext(); // runs a getMore + } + + // Close the cursor since we have already retrieved all results. + csCursor.close(); + + // Extract all events from the streams. Since the cursor is closed, it will not attempt to + // retrieve any more batches from the server. + return csCursor.toArray(); +} + +// Obtain a list of all events that occurred during the write workload. +const allEvents = getAllChangeStreamEvents([], {fullDocument: "updateLookup"}); + +jsTestLog(`All events: ${tojson(allEvents)}`); + +// List of specific fields and values that we wish to test. This will be populated during traversal +// of the events in the stream, but we can add further paths and extra values which will not appear +// in the stream but which we nonetheless want to test. Note that null and existence predicates will +// be tested for every field, and do not need to be explicitly specified here. The format of each +// entry is as follows: +// +// { +// "full.path.to.field": { +// extraValues: [special values we wish to test that do not appear in the stream], +// values: [automatically populated by examining the stream] +// } +// } +const fieldsToBeTested = { + // Test documentKey with a field that is in the full object but not in the documentKey. + "documentKey": {extraValues: [{f2: null, _id: 1}], values: []}, + "documentKey.f1": {extraValues: [{subField: true}], values: []} +}; + +// Always test these subfields for all parent fields. +const standardSubFieldsToTest = ["nonExistentField"]; + +// Traverse each event in the stream and build up a map of all field paths. +allEvents.forEach((event) => traverseEvent(event, fieldsToBeTested)); + +jsTestLog(`Final set of fields to test: ${tojson(fieldsToBeTested)}`); + +// Define the filters that we want to apply to each field. +function generateMatchFilters(fieldPath) { + const valuesToTest = + fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues); + + const filters = [ + {[fieldPath]: {$eq: null}}, + {[fieldPath]: {$ne: null}}, + {[fieldPath]: {$lte: null}}, + {[fieldPath]: {$gte: null}}, + {[fieldPath]: {$exists: true}}, + {[fieldPath]: {$exists: false}} + ]; + + for (let value of valuesToTest) { + filters.push({[fieldPath]: value}); + } + + return filters; +} +function generateExprFilters(fieldPath) { + const valuesToTest = + fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues); + + const exprFieldPath = "$" + fieldPath; + const exprs = [ + {$expr: {$eq: [exprFieldPath, null]}}, + {$expr: {$ne: [exprFieldPath, null]}}, + {$expr: {$lte: [exprFieldPath, null]}}, + {$expr: {$gte: [exprFieldPath, null]}}, + {$expr: {$eq: [exprFieldPath, "$$REMOVE"]}}, + {$expr: {$ne: [exprFieldPath, "$$REMOVE"]}}, + {$expr: {$lte: [exprFieldPath, "$$REMOVE"]}}, + {$expr: {$gte: [exprFieldPath, "$$REMOVE"]}} + ]; + + for (let value of valuesToTest) { + exprs.push({$expr: {$eq: [exprFieldPath, value]}}); + } + + return exprs; +} + +// Record all failed test cases to be reported at the end of the test. +const failedTestCases = []; + +// Confirm that the output of an optimized change stream matches an unoptimized stream. +for (let csConfig of [{fullDocument: "updateLookup"}]) { + for (let fieldToTest in fieldsToBeTested) { + const predicatesToTest = + generateMatchFilters(fieldToTest).concat(generateExprFilters(fieldToTest)); + for (let predicate of predicatesToTest) { + // Create a $match expression for the current predicate. + const matchExpr = {$match: predicate}; + + jsTestLog(`Testing filter ${tojsononeline(matchExpr)} with ${tojsononeline(csConfig)}`); + + // Construct one optimized pipeline, and one which inhibits optimization. + const nonOptimizedPipeline = [{$_internalInhibitOptimization: {}}, matchExpr]; + const optimizedPipeline = [matchExpr]; + + // Extract all results from each of the pipelines. + const nonOptimizedOutput = getAllChangeStreamEvents(nonOptimizedPipeline, csConfig); + const optimizedOutput = getAllChangeStreamEvents(optimizedPipeline, csConfig); + + // We never expect to see more optimized results than unoptimized. + assert(optimizedOutput.length <= nonOptimizedOutput.length, + {optimizedOutput: optimizedOutput, nonOptimizedOutput: nonOptimizedOutput}); + + // Check the unoptimized results against the optimized results. If we observe an entry + // in the non-optimized array that is not present in the optimized, add the details to + // 'failedTestCases' and continue. + for (let i = 0; i < nonOptimizedOutput.length; ++i) { + try { + assert(i < optimizedOutput.length); + assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i])); + } catch (error) { + failedTestCases.push({ + matchExpr: matchExpr, + csConfig: csConfig, + events: {nonOptimized: nonOptimizedOutput[i], optimized: optimizedOutput[i]} + }); + jsTestLog(`Total failures: ${failedTestCases.length}`); + break; + } + } + } + } +} + +// Assert that there were no failed test cases. +assert(failedTestCases.length == 0, failedTestCases); +})();
\ No newline at end of file diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp index 3dee9c53520..9e1cd0d308e 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp @@ -49,6 +49,32 @@ using AggExpressionRewrite = namespace { /** + * Helpers to clone an expression to the same type and rename the fields to which it applies. + */ +std::unique_ptr<PathMatchExpression> cloneWithSubstitution( + const PathMatchExpression* predicate, const StringMap<std::string>& renameList) { + auto clonedPred = std::unique_ptr<PathMatchExpression>( + static_cast<PathMatchExpression*>(predicate->shallowClone().release())); + clonedPred->applyRename(renameList); + return clonedPred; +} +boost::intrusive_ptr<ExpressionFieldPath> cloneWithSubstitution( + const ExpressionFieldPath* expr, const StringMap<std::string>& renameList) { + return static_cast<ExpressionFieldPath*>(expr->copyWithSubstitution(renameList).release()); +} + +/** + * Helper to resolve a predicate on a non-existent field to either AlwaysTrue or AlwaysFalse. + */ +std::unique_ptr<MatchExpression> resolvePredicateOnNonExistentField( + const PathMatchExpression* predicate) { + if (predicate->matchesSingleElement({})) { + return std::make_unique<AlwaysTrueMatchExpression>(); + } + return std::make_unique<AlwaysFalseMatchExpression>(); +} + +/** * Rewrites filters on 'operationType' in a format that can be applied directly to the oplog. * Returns nullptr if the predicate cannot be rewritten. * @@ -67,9 +93,9 @@ std::unique_ptr<MatchExpression> matchRewriteOperationType( str::stream() << "Unexpected predicate on " << predicate->path(), predicate->fieldRef()->getPart(0) == DocumentSourceChangeStream::kOperationTypeField); - // If the query is on a subfield of operationType, it will never match. + // If the query is on a subfield of operationType, it will always be missing. if (predicate->fieldRef()->numParts() > 1) { - return std::make_unique<AlwaysFalseMatchExpression>(); + return resolvePredicateOnNonExistentField(predicate); } static const auto kExistsTrue = Document{{"$exists", true}}; @@ -223,8 +249,7 @@ std::unique_ptr<MatchExpression> matchRewriteDocumentKey( // Helper to generate a filter on the 'op' field for the specified type. This filter will also // include a copy of 'predicate' with the path renamed to apply to the oplog. auto generateFilterForOp = [&](StringData op, const StringMap<std::string>& renameList) { - auto renamedPredicate = predicate->shallowClone(); - static_cast<PathMatchExpression*>(renamedPredicate.get())->applyRename(renameList); + auto renamedPredicate = cloneWithSubstitution(predicate, renameList); auto andExpr = std::make_unique<AndMatchExpression>(); andExpr->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value(op))); @@ -235,13 +260,31 @@ std::unique_ptr<MatchExpression> matchRewriteDocumentKey( // The MatchExpression which will contain the final rewritten predicate. auto rewrittenPredicate = std::make_unique<OrMatchExpression>(); + // Handle the case of non-CRUD events. The 'documentKey' field never exists for such events, so + // we evaluate the predicate against a non-existent field to see whether it matches. + if (predicate->matchesSingleElement({})) { + auto nonCRUDCase = MatchExpressionParser::parseAndNormalize( + fromjson("{$nor: [{op: 'i'}, {op: 'u'}, {op: 'd'}]}"), expCtx); + rewrittenPredicate->add(std::move(nonCRUDCase)); + } + // Handle update, replace and delete. The predicate path can simply be renamed. rewrittenPredicate->add(generateFilterForOp("u"_sd, {{"documentKey", "o2"}})); rewrittenPredicate->add(generateFilterForOp("d"_sd, {{"documentKey", "o"}})); - // If the path is a subfield of 'documentKey', inserts can also be handled by renaming. + // If the path is a subfield of 'documentKey', inserts can also be handled by renaming, as long + // as the path starts with _id or the predicate does not match a missing field. if (predicate->fieldRef()->numParts() > 1) { - rewrittenPredicate->add(generateFilterForOp("i"_sd, {{"documentKey", "o"}})); + // If the predicate matches against a missing field and is on a field which exists in the + // full document but not the documentKey, then applying that predicate to the 'o' field in + // the oplog will discard entries that would have matched the eventual change stream event. + if (pathStartsWithDKId || !predicate->matchesSingleElement({})) { + rewrittenPredicate->add(generateFilterForOp("i"_sd, {{"documentKey", "o"}})); + } else { + // We can't rewrite the predicate, so we have to match all {op: "i"} events. + rewrittenPredicate->add( + std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd))); + } return rewrittenPredicate; } @@ -340,18 +383,16 @@ boost::intrusive_ptr<Expression> exprRewriteDocumentKey( std::vector<BSONObj> opCases; // Cases for 'insert' and 'delete'. - auto insertAndDeletePath = - static_cast<ExpressionFieldPath*>(expr->copyWithSubstitution({{"documentKey", "o"}}).get()) - ->getFieldPathWithoutCurrentPrefix() - .fullPathWithPrefix(); + auto insertAndDeletePath = cloneWithSubstitution(expr, {{"documentKey", "o"}}) + ->getFieldPathWithoutCurrentPrefix() + .fullPathWithPrefix(); opCases.push_back( fromjson("{case: {$in: ['$op', ['i', 'd']]}, then: '" + insertAndDeletePath + "'}")); // Cases for 'update' and 'replace'. - auto updateAndReplacePath = - static_cast<ExpressionFieldPath*>(expr->copyWithSubstitution({{"documentKey", "o2"}}).get()) - ->getFieldPathWithoutCurrentPrefix() - .fullPathWithPrefix(); + auto updateAndReplacePath = cloneWithSubstitution(expr, {{"documentKey", "o2"}}) + ->getFieldPathWithoutCurrentPrefix() + .fullPathWithPrefix(); opCases.push_back( fromjson("{case: {$eq: ['$op', 'u']}, then: '" + updateAndReplacePath + "'}")); @@ -397,7 +438,7 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument( // {$or: [ // {$and: [{op: 'u'}, {'o._id': {$exists: false}}]}, // {$and: [ - // {$or: [{op: 'i'}, {op: 'u'}]}, + // {$or: [{op: 'i'}, {op: 'u', 'o._id': {$exists: true}}]}, // {o: <predicate>} // ]}, // // The following predicates are only present if the predicate matches a missing field @@ -418,14 +459,11 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument( // cases, because the full document is present in the oplog. auto insertOrReplaceCase = std::make_unique<AndMatchExpression>(); - auto insertOrReplaceOpFilter = std::make_unique<OrMatchExpression>(); - insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("i"_sd))); - insertOrReplaceOpFilter->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd))); + auto insertOrReplaceOpFilter = MatchExpressionParser::parseAndNormalize( + fromjson("{$or: [{op: 'i'}, {op: 'u', 'o._id': {$exists: true}}]}"), expCtx); insertOrReplaceCase->add(std::move(insertOrReplaceOpFilter)); - auto predForInsertOrReplace = predicate->shallowClone(); - static_cast<PathMatchExpression*>(predForInsertOrReplace.get()) - ->applyRename({{"fullDocument", "o"}}); + auto predForInsertOrReplace = cloneWithSubstitution(predicate, {{"fullDocument", "o"}}); insertOrReplaceCase->add(std::move(predForInsertOrReplace)); rewrittenPredicate->add(std::move(insertOrReplaceCase)); @@ -458,136 +496,193 @@ std::unique_ptr<MatchExpression> matchRewriteUpdateDescription( predicate->fieldRef()->getPart(0) == DocumentSourceChangeStream::kUpdateDescriptionField); - // Check that this is a non-replacement update, i.e. {op: "u", "o._id": {$exists: false}}. - auto rewrittenPredicate = std::make_unique<AndMatchExpression>(); - rewrittenPredicate->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd))); - rewrittenPredicate->add( - std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd))); - - // For predicates on a non-dotted subfield of 'updateDescription.updatedFields' we can generate - // a rewritten predicate that matches exactly like so: - // - // {updateDescription.updatedFields.<fieldName>: <pred>} - // => - // {$and: [ - // {op: "u"}, - // {"o._id": {$exists: false}}, - // {$or: [ - // {o.diff.i.<fieldName>: <pred>}, - // {o.diff.u.<fieldName>: <pred>}, - // {o.$set.<fieldName>: <pred>} - // ]} - // ]} - if (predicate->fieldRef()->numParts() == 3 && - predicate->fieldRef()->getPart(1) == "updatedFields"_sd) { - // The oplog field corresponding to "updateDescription.updatedFields" can be in any one of - // three locations. Construct an $or filter to match against them all. - static const std::vector<std::string> oplogFields = {"o.diff.i", "o.diff.u", "o.$set"}; - auto updatedFieldsOr = std::make_unique<OrMatchExpression>(); - for (auto&& oplogField : oplogFields) { - auto updateRewrite = predicate->shallowClone(); - static_cast<PathMatchExpression*>(updateRewrite.get()) - ->applyRename({{"updateDescription.updatedFields", oplogField}}); - updatedFieldsOr->add(std::move(updateRewrite)); - } - // Add the $or into the final rewritten predicate and return. - rewrittenPredicate->add(std::move(updatedFieldsOr)); - return rewrittenPredicate; - } - - // For $eq predicates and $in predicates on 'updateDescription.removedFields' we can generate - // a rewritten predicate that matches exactly like so: - // - // {updateDescription.removedFields: {$eq: <fieldName>}} - // => - // {$and: [ - // {op: "u"}, - // {"o._id": {$exists: false}}, - // {$or: [ - // {o.diff.d.<fieldName>: {$exists: true}}, - // {o.$unset.<fieldName>: {$exists: true}} - // ]} - // ]} - // - // {updateDescription.removedFields: {$in: [<fieldName1>, <fieldName2>, ..]}} - // => - // {$and: [ - // {op: "u"}, - // {"o._id": {$exists: false}}, - // {$or: [ - // {o.diff.d.<fieldName1>: {$exists: true}}, - // {o.$unset.<fieldName1>: {$exists: true}}, - // {o.diff.d.<fieldName2>: {$exists: true}}, - // {o.$unset.<fieldName2>: {$exists: true}}, - // .. - // ]} - // ]} - if (predicate->fieldRef()->numParts() == 2 && - predicate->fieldRef()->getPart(1) == "removedFields"_sd) { - // Helper to rewrite an equality on "updateDescription.removedFields" into the oplog. - auto rewriteEqualityForOplog = [](auto& rhsElem) -> std::unique_ptr<MatchExpression> { - // We can only rewrite equality matches on strings. - if (rhsElem.type() != BSONType::String) { - return nullptr; + // We can't determine whether we can perform a strict rewrite until we examine the predicate. We + // wrap this in a helper function that we can call while building the filter. We try to rewrite + // the predicate assuming that it will only be applied to non-replacement update oplog events. + auto tryExactRewriteForUpdateEvents = [](auto predicate) -> std::unique_ptr<MatchExpression> { + // $exists and null-equality checks on 'updateDescription' or its immediate subfields are + // AlwaysTrue or AlwaysFalse, since these fields are always present in the update event. + static const std::set<std::string> existentFields = {"updateDescription", + "updateDescription.updatedFields", + "updateDescription.removedFields", + "updateDescription.truncatedArrays"}; + if (existentFields.count(predicate->path().toString())) { + // An {$exists:true} predicate will always match against any of these fields. + if (predicate->matchType() == MatchExpression::EXISTS) { + return std::make_unique<AlwaysTrueMatchExpression>(); } - // We can only rewrite top-level fields, i.e. no dotted subpaths. - auto fieldName = rhsElem.str(); - if (FieldRef(fieldName).numParts() > 1) { - return nullptr; + // We check whether this is a ComparisonMatchExpression to ensure that the predicate is + // type-bracketed, which means that it will *only* match missing, null, or undefined. + // None of these fields will ever be null or undefined in the change stream event. + if (ComparisonMatchExpression::isComparisonMatchExpression(predicate) && + predicate->matchesSingleElement({})) { + return std::make_unique<AlwaysFalseMatchExpression>(); } - // The oplog field corresponding to "updateDescription.removedFields" can be in either - // of two locations. Construct an $or filter to match against them both. - static const std::vector<std::string> oplogFields = {"o.diff.d", "o.$unset"}; - auto removedFieldsOr = std::make_unique<OrMatchExpression>(); + } + + // For predicates on a non-dotted subfield of 'updateDescription.updatedFields' we can + // generate a rewritten predicate that matches exactly like so: + // + // {updateDescription.updatedFields.<fieldName>: <pred>} + // => + // {$and: [ + // {op: "u"}, + // {"o._id": {$exists: false}}, + // {$or: [ + // {o.diff.i.<fieldName>: <pred>}, + // {o.diff.u.<fieldName>: <pred>}, + // {o.$set.<fieldName>: <pred>} + // ]} + // ]} + if (predicate->fieldRef()->numParts() == 3 && + predicate->fieldRef()->getPart(1) == "updatedFields"_sd) { + // The oplog field corresponding to "updateDescription.updatedFields" can be in any one + // of three locations. We will attempt to construct a filter to match against them all. + static const std::vector<std::string> oplogFields = {"o.diff.i", "o.diff.u", "o.$set"}; + // If this predicate matches against a missing field, then we must apply an $and to all + // three potential locations, since at least two of them will always be missing. If not, + // then we build an $or to match if the field is present at any of the locations. + auto rewrittenUserPredicate = [predicate]() -> std::unique_ptr<ListOfMatchExpression> { + if (predicate->matchesSingleElement({})) { + return std::make_unique<AndMatchExpression>(); + } + return std::make_unique<OrMatchExpression>(); + }(); + // Rewrite the predicate for each of the three potential oplog locations. for (auto&& oplogField : oplogFields) { - removedFieldsOr->add( - std::make_unique<ExistsMatchExpression>(oplogField + "." + fieldName)); + rewrittenUserPredicate->add(cloneWithSubstitution( + predicate, {{"updateDescription.updatedFields", oplogField}})); } - return removedFieldsOr; - }; - - // We can only match against a limited number of predicates here, $eq and $in. - switch (predicate->matchType()) { - case MatchExpression::EQ: { - // Try to rewrite the predicate on "updateDescription.removedFields". - auto eqME = static_cast<const EqualityMatchExpression*>(predicate); - if (auto removedRewrite = rewriteEqualityForOplog(eqME->getData())) { - rewrittenPredicate->add(std::move(removedRewrite)); - return rewrittenPredicate; + // Return the final rewritten predicate. + return rewrittenUserPredicate; + } + + // For $eq predicates and $in predicates on 'updateDescription.removedFields' we can + // generate a rewritten predicate that matches exactly like so: + // + // {updateDescription.removedFields: {$eq: <fieldName>}} + // => + // {$and: [ + // {op: "u"}, + // {"o._id": {$exists: false}}, + // {$or: [ + // {o.diff.d.<fieldName>: {$exists: true}}, + // {o.$unset.<fieldName>: {$exists: true}} + // ]} + // ]} + // + // {updateDescription.removedFields: {$in: [<fieldName1>, <fieldName2>, ..]}} + // => + // {$and: [ + // {op: "u"}, + // {"o._id": {$exists: false}}, + // {$or: [ + // {o.diff.d.<fieldName1>: {$exists: true}}, + // {o.$unset.<fieldName1>: {$exists: true}}, + // {o.diff.d.<fieldName2>: {$exists: true}}, + // {o.$unset.<fieldName2>: {$exists: true}}, + // .. + // ]} + // ]} + if (predicate->fieldRef()->numParts() == 2 && + predicate->fieldRef()->getPart(1) == "removedFields"_sd) { + // Helper to rewrite an equality on "updateDescription.removedFields" into the oplog. + auto rewriteEqOnRemovedFields = [](auto& rhsElem) -> std::unique_ptr<MatchExpression> { + // We can only rewrite equality matches on strings. + if (rhsElem.type() != BSONType::String) { + return nullptr; } - break; - } - case MatchExpression::MATCH_IN: { - // If this $in includes any regexes, we can't proceed with the rewrite. - auto inME = static_cast<const InMatchExpression*>(predicate); - if (!inME->getRegexes().empty()) { + // We can only rewrite top-level fields, i.e. no dotted subpaths. + auto fieldName = rhsElem.str(); + if (FieldRef(fieldName).numParts() > 1) { return nullptr; } - // An empty '$in' should never match anything. - if (inME->getEqualities().empty()) { - return std::make_unique<AlwaysFalseMatchExpression>(); + // The oplog field corresponding to "updateDescription.removedFields" can be in + // either of two locations. Construct an $or filter to match against them both. + // Because we have already validated that this is an equality string match, we do + // not need to check whether the predicate matches a missing field in this case. + static const std::vector<std::string> oplogFields = {"o.diff.d", "o.$unset"}; + auto rewrittenEquality = std::make_unique<OrMatchExpression>(); + for (auto&& oplogField : oplogFields) { + rewrittenEquality->add( + std::make_unique<ExistsMatchExpression>(oplogField + "." + fieldName)); } - // Try to rewrite the $in as an $or of equalities on the oplog. If any individual - // rewrite fails, we must abandon the entire rewrite. - auto inRemovedOr = std::make_unique<OrMatchExpression>(); - for (const auto& rhsElem : inME->getEqualities()) { - if (auto removedRewrite = rewriteEqualityForOplog(rhsElem)) { - inRemovedOr->add(std::move(removedRewrite)); - } else { + return rewrittenEquality; + }; + + // We can only match against a limited number of predicates here, $eq and $in. + switch (predicate->matchType()) { + case MatchExpression::EQ: { + // Try to rewrite the predicate on "updateDescription.removedFields". + auto eqME = static_cast<const EqualityMatchExpression*>(predicate); + return rewriteEqOnRemovedFields(eqME->getData()); + } + case MatchExpression::MATCH_IN: { + // If this $in includes any regexes, we can't proceed with the rewrite. + auto inME = static_cast<const InMatchExpression*>(predicate); + if (!inME->getRegexes().empty()) { return nullptr; } + // An empty '$in' should never match anything. + if (inME->getEqualities().empty()) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + // Try to rewrite the $in as an $or of equalities on the oplog. If any + // individual rewrite fails, we must abandon the entire rewrite. + auto rewrittenUserPredicate = std::make_unique<OrMatchExpression>(); + for (const auto& rhsElem : inME->getEqualities()) { + if (auto rewrittenEquality = rewriteEqOnRemovedFields(rhsElem)) { + rewrittenUserPredicate->add(std::move(rewrittenEquality)); + } else { + return nullptr; + } + } + // Return the final rewritten predicate. + return rewrittenUserPredicate; } - // Add the rewritten $in to the final rewritten predicate and return. - rewrittenPredicate->add(std::move(inRemovedOr)); - return rewrittenPredicate; + default: + break; } - default: - break; } + // If we reach here, we cannot rewrite this predicate. + return nullptr; + }; + + // Try to rewrite the user predicate. If we can't, then we may not be able to continue. + auto rewrittenUserPredicate = tryExactRewriteForUpdateEvents(predicate); + + // If a strict rewrite is required and we could not rewrite the predicate, return nullptr. We + // also return nullptr if the predicate matches a missing field, since it is pointless to try + // to continue; we would have to return all updates, because we don't know whether they will + // match, and all non-updates, because they will always match. + if (!rewrittenUserPredicate && (!allowInexact || predicate->matchesSingleElement({}))) { + return nullptr; } - // If we reach here, we cannot perform a rewrite. - return nullptr; + // If we are here, then either we were able to rewrite the predicate, or we were not but an + // inexact rewrite is permissible. First write a predicate to check that this is an update + // that is not a full-document replacement, i.e. {op: "u", "o._id": {$exists: false}}. + std::unique_ptr<ListOfMatchExpression> finalPredicate = std::make_unique<AndMatchExpression>(); + finalPredicate->add(std::make_unique<EqualityMatchExpression>("op"_sd, Value("u"_sd))); + finalPredicate->add( + std::make_unique<NotMatchExpression>(std::make_unique<ExistsMatchExpression>("o._id"_sd))); + + // If we were able to rewrite the user predicate, add it into the final predicate. + if (rewrittenUserPredicate) { + finalPredicate->add(std::move(rewrittenUserPredicate)); + } + + // Handle the case of non-update events. The 'updateDescription' field never exists for these + // events, so we evaluate the predicate against a non-existent field to see whether it matches. + if (predicate->matchesSingleElement({})) { + auto nonUpdateCase = MatchExpressionParser::parseAndNormalize( + fromjson("{$or: [{op: {$ne: 'u'}}, {op: 'u', 'o._id': {$exists: true}}]}"), expCtx); + finalPredicate = std::make_unique<OrMatchExpression>(std::move(finalPredicate), nullptr); + finalPredicate->add(std::move(nonUpdateCase)); + } + + // Finally, we return the complete rewritten predicate. + return finalPredicate; } // Helper to rewrite predicates on any change stream namespace field of the form {db: "dbName", @@ -1222,7 +1317,7 @@ boost::intrusive_ptr<Expression> rewriteAggExpressionTree( // Some paths can be rewritten just by renaming the path. if (renameRegistry.contains(firstPath)) { - return fieldExpr->copyWithSubstitution(renameRegistry).release(); + return cloneWithSubstitution(fieldExpr, renameRegistry); } // Other paths have custom rewrite logic. @@ -1363,9 +1458,7 @@ std::unique_ptr<MatchExpression> rewriteMatchExpressionTree( // Some paths can be rewritten just by renaming the path. if (renameRegistry.contains(firstPath)) { - auto renamedME = pathME->shallowClone(); - static_cast<PathMatchExpression*>(renamedME.get())->applyRename(renameRegistry); - return renamedME; + return cloneWithSubstitution(pathME, renameRegistry); } // Other paths have custom rewrite logic. @@ -1386,27 +1479,33 @@ std::unique_ptr<MatchExpression> rewriteMatchExpressionTree( std::unique_ptr<MatchExpression> rewriteFilterForFields( const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch, - std::set<std::string> fields) { + std::set<std::string> includeFields, + std::set<std::string> excludeFields) { // If we get null in, we return null immediately. if (!userMatch) { return nullptr; } // If the specified 'fields' set is empty, we rewrite every possible field. - if (fields.empty()) { + if (includeFields.empty()) { for (auto& rename : renameRegistry) { - fields.insert(rename.first); + includeFields.insert(rename.first); } for (auto& meRewrite : matchRewriteRegistry) { - fields.insert(meRewrite.first); + includeFields.insert(meRewrite.first); } for (auto& exprRewrite : exprRewriteRegistry) { - fields.insert(exprRewrite.first); + includeFields.insert(exprRewrite.first); } } + // Remove any fields which are present in the "excludeFields" list. + for (auto&& excludeField : excludeFields) { + includeFields.erase(excludeField); + } + // Attempt to rewrite the tree. Predicates on unknown or unrequested fields will be discarded. - return rewriteMatchExpressionTree(expCtx, userMatch, fields, true /* allowInexact */); + return rewriteMatchExpressionTree(expCtx, userMatch, includeFields, true /* allowInexact */); } } // namespace change_stream_rewrite } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.h b/src/mongo/db/pipeline/change_stream_rewrite_helpers.h index 6ba6c67cc0f..9df83fde24f 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.h +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.h @@ -40,13 +40,14 @@ namespace change_stream_rewrite { * 'userMatch' filter, reducing the amount of work that subsequent change streams stage will have to * do. * - * The rewrite splits out only the part of 'userMatch' that just needs those fields that are - * indicated in the 'fields' set. When 'fields' is the empty set, the rewrite includes all paths - * that can be rewritten. + * The rewrites will only be performed on fields which are present in the 'includeFields' set and + * absent from the 'excludeFields' set. When 'includeFields' is the empty set, the rewrite defaults + * to including all paths that can be rewritten. */ std::unique_ptr<MatchExpression> rewriteFilterForFields( const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch, - std::set<std::string> fields = {}); + std::set<std::string> includeFields = {}, + std::set<std::string> excludeFields = {}); } // namespace change_stream_rewrite } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5dcb4550ba3..c2cbe9ea701 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -3928,6 +3928,9 @@ public: } }; +// +// Logical rewrites +// TEST_F(ChangeStreamRewriteTest, RewriteOrPredicateOnRenameableFields) { auto spec = fromjson("{$or: [{clusterTime: {$type: [17]}}, {lsid: {$type: [16]}}]}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4045,57 +4048,6 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteExprWhenAllFieldsAreRenameable) { "{$isNumber: ['$lsid']}]}}")); } -TEST_F(ChangeStreamRewriteTest, CanRewriteExprWithOperationType) { - auto spec = fromjson("{$expr: {$eq: ['$operationType', 'insert']}}"); - auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); - ASSERT_OK(statusWithMatchExpression.getStatus()); - - auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( - getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); - - auto rewrittenPredicate = rewrittenMatchExpression->serialize(); - - const string expectedRewrite = R"( -{ - $expr: { - $eq: [ - { - $switch: { - branches: [ - {case: {$eq: ['$op', {$const: 'i'}]}, then: {$const: 'insert'}}, - { - case: { - $and: - [{$eq: ['$op', {$const: 'u'}]}, {$eq: ['$o._id', '$$REMOVE']}] - }, - then: {$const: 'update'} - }, - { - case: { - $and: - [{$eq: ['$op', {$const: 'u'}]}, {$ne: ['$o._id', '$$REMOVE']}] - }, - then: {$const: 'replace'} - }, - {case: {$eq: ['$op', {$const: 'd'}]}, then: {$const: 'delete'}}, - {case: {$ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}, - {case: {$ne: ['$o.drop', '$$REMOVE']}, then: {$const: 'drop'}}, - { - case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, - then: {$const: 'dropDatabase'} - }, - {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}} - ], - default: '$$REMOVE' - } - }, - {$const: 'insert'} - ] - } -})"; - ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson(expectedRewrite)); -} - TEST_F(ChangeStreamRewriteTest, CanInexactlyRewriteExprAndWithUnrewritableChild) { // Note that rewrite of "$$ROOT" without a path is unsupported. auto spec = fromjson("{$expr: {$and: [{$isNumber: '$lsid'}, {$isArray: '$$ROOT'}]}}"); @@ -4251,6 +4203,9 @@ TEST_F(ChangeStreamRewriteTest, DoesNotRewriteUnrequestedFieldInExpr) { ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$expr: {$and: [{$isNumber: ['$ts']}]}}")); } +// +// 'operationType' rewrites +// TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeWithInvalidOperandType) { auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("operationType" << 1), getExpCtx()); @@ -4275,6 +4230,19 @@ TEST_F(ChangeStreamRewriteTest, CannotRewriteEqPredicateOnOperationTypeWithUnkno ASSERT_FALSE(rewrittenMatchExpression); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationType) { + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("operationType" << BSONNULL), getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}")); +} + TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeInsert) { auto spec = fromjson("{operationType: 'insert'}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4368,6 +4336,19 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeSubField) { ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationTypeSubField) { + auto spec = fromjson("{'operationType.subField': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysTrue: 1}")); +} + TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) { auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("drop" << "insert"))); @@ -4463,6 +4444,60 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNinPredicateOnOperationType) { fromjson("{op: {$eq: 'i'}}")))))); } +TEST_F(ChangeStreamRewriteTest, CanRewriteExprWithOperationType) { + auto spec = fromjson("{$expr: {$eq: ['$operationType', 'insert']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + + const string expectedRewrite = R"( +{ + $expr: { + $eq: [ + { + $switch: { + branches: [ + {case: {$eq: ['$op', {$const: 'i'}]}, then: {$const: 'insert'}}, + { + case: { + $and: + [{$eq: ['$op', {$const: 'u'}]}, {$eq: ['$o._id', '$$REMOVE']}] + }, + then: {$const: 'update'} + }, + { + case: { + $and: + [{$eq: ['$op', {$const: 'u'}]}, {$ne: ['$o._id', '$$REMOVE']}] + }, + then: {$const: 'replace'} + }, + {case: {$eq: ['$op', {$const: 'd'}]}, then: {$const: 'delete'}}, + {case: {$ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}, + {case: {$ne: ['$o.drop', '$$REMOVE']}, then: {$const: 'drop'}}, + { + case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, + then: {$const: 'dropDatabase'} + }, + {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}} + ], + default: '$$REMOVE' + } + }, + {$const: 'insert'} + ] + } +})"; + ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson(expectedRewrite)); +} + +// +// 'documentKey' rewrites +// TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnFieldDocumentKey) { auto spec = fromjson("{documentKey: {_id: 'bar', foo: 'baz'}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4493,6 +4528,38 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnFieldDocumentKey) { "]}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldDocumentKey) { + auto spec = fromjson("{documentKey: {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"documentKey"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$nor: [" + " {op: {$eq: 'i'}}," + " {op: {$eq: 'u'}}," + " {op: {$eq: 'd'}}" + " ]}," + " {$and: [" + " {op: {$eq: 'u'}}," + " {o2: {$eq: null}}" + " ]}," + " {$and: [" + " {op: {$eq: 'd'}}," + " {o: {$eq: null}}" + " ]}," + " {$and: [" + " {op: {$eq: 'i'}}," + " {$alwaysFalse: 1}" + " ]}" + "]}")); +} + TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnFieldDocumentKey) { auto spec = fromjson("{documentKey: {$in: [{}, {_id: 'bar'}, {_id: 'bar', foo: 'baz'}]}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4570,6 +4637,38 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldDocumentKeyId "]}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldDocumentKeyId) { + auto spec = fromjson("{'documentKey._id': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"documentKey"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$nor: [" + " {op: {$eq: 'i'}}," + " {op: {$eq: 'u'}}," + " {op: {$eq: 'd'}}" + " ]}," + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o2._id': {$eq: null}}" + " ]}," + " {$and: [" + " {op: {$eq: 'd'}}," + " {'o._id': {$eq: null}}" + " ]}," + " {$and: [" + " {op: {$eq: 'i'}}," + " {'o._id': {$eq: null}}" + " ]}" + "]}")); +} + TEST_F(ChangeStreamRewriteTest, CanExactlyRewriteExprPredicateOnFieldDocumentKeyId) { auto spec = fromjson("{$expr: {$lt: ['$documentKey._id', 'bar']}}"); @@ -4624,6 +4723,35 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldDocumentKeyFo "]}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldDocumentKeyFoo) { + auto spec = fromjson("{'documentKey.foo': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"documentKey"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$nor: [" + " {op: {$eq: 'i'}}," + " {op: {$eq: 'u'}}," + " {op: {$eq: 'd'}}" + " ]}," + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o2.foo': {$eq: null}}" + " ]}," + " {$and: [" + " {op: {$eq: 'd'}}," + " {'o.foo': {$eq: null}}" + " ]}," + " {op: {$eq: 'i'}}" + "]}")); +} + TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldDocumentKey) { auto spec = fromjson("{documentKey: {$not: {$eq: {_id: 'bar'}}}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4721,6 +4849,9 @@ TEST_F(ChangeStreamRewriteTest, CannotExactlyRewriteExprPredicateOnFieldDocument ASSERT(rewrittenMatchExpression == nullptr); } +// +// 'fullDocument' rewrites +// TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldFullDocumentFoo) { auto spec = fromjson("{'fullDocument.foo': {$lt: 'bar'}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4739,8 +4870,11 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldFullDocumentF " ]}," " {$and: [" " {$or: [" - " {op: {$eq: 'i'}}," - " {op: {$eq: 'u'}}" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$exists: true}}" + " ]}," + " {op: {$eq: 'i'}}" " ]}," " {'o.foo': {$lt: 'bar'}}" " ]}" @@ -4769,8 +4903,11 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNullComparisonPredicateOnFieldFullDocu " ]}," " {$and: [" " {$or: [" - " {op: {$eq: 'i'}}," - " {op: {$eq: 'u'}}" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$exists: true}}" + " ]}," + " {op: {$eq: 'i'}}" " ]}," " {'o.foo': {$eq: null}}" " ]}," @@ -4793,6 +4930,9 @@ TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldFullDocument ASSERT(rewrittenMatchExpression == nullptr); } +// +// 'ns' rewrites +// TEST_F(ChangeStreamRewriteTest, CanRewriteFullNamespaceObject) { auto expCtx = getExpCtx(); auto statusWithMatchExpression = MatchExpressionParser::parse( @@ -5782,6 +5922,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnInvalidFieldPath) { fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}")); } +// +// 'to' rewrites +// TEST_F(ChangeStreamRewriteTest, CanRewriteFullToObject) { auto expCtx = getExpCtx(); auto statusWithMatchExpression = MatchExpressionParser::parse( @@ -6496,7 +6639,10 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteToWithExprOnInvalidCollSubFieldPath) { fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}")); } -TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescription) { +// +// 'updateDescription' rewrites +// +TEST_F(ChangeStreamRewriteTest, CanInexactlyRewritePredicateOnFieldUpdateDescription) { auto expCtx = getExpCtx(); auto expr = fromjson( "{updateDescription: {updatedFields: {}, removedFields: [], truncatedArrays: []}}"); @@ -6505,11 +6651,82 @@ TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescription) auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldUpdateDescription) { + auto expCtx = getExpCtx(); + auto expr = fromjson( + "{updateDescription: {$ne: {updatedFields: {}, removedFields: [], truncatedArrays: []}}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } -TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionUpdatedFields) { +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescription) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{updateDescription: {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that this produces an {$alwaysFalse:1} predicate for update events. This will optimize + // away the enclosing $and so that only non-updates will be returned from the oplog scan. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysFalse: 1}" + " ]}," + " {$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$exists: true}}" + " ]}," + " {op: {$not: {$eq: 'u'}}}" + " ]}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteExistsPredicateOnFieldUpdateDescription) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{updateDescription: {$exists: true}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that the {$alwaysTrue:1} predicate is an artefact of the rewrite process. It will be + // optimized away and will have no functional impact on the filter. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysTrue: 1}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, CanInexactlyRewritePredicateOnFieldUpdateDescriptionUpdatedFields) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.updatedFields': {}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6517,10 +6734,81 @@ TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionUp auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewritePredicateOnFieldUpdateDescriptionUpdatedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields': {$ne: {}}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescriptionUpdatedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that this produces an {$alwaysFalse:1} predicate for update events. This will optimize + // away the enclosing $and so that only non-updates will be returned from the oplog scan. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysFalse: 1}" + " ]}," + " {$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$exists: true}}" + " ]}," + " {op: {$not: {$eq: 'u'}}}" + " ]}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteExistsPredicateOnFieldUpdateDescriptionUpdatedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields': {$exists: true}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that the {$alwaysTrue:1} predicate is an artefact of the rewrite process. It will be + // optimized away and will have no functional impact on the filter. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysTrue: 1}" + "]}")); +} + TEST_F(ChangeStreamRewriteTest, CanRewriteArbitraryPredicateOnFieldUpdateDescriptionUpdatedFieldsFoo) { auto expCtx = getExpCtx(); @@ -6545,8 +6833,41 @@ TEST_F(ChangeStreamRewriteTest, "]}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescriptionUpdatedFieldsFoo) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields.foo': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that we perform an $and of all three oplog locations for this rewrite. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$and: [" + " {'o.diff.i.foo': {$eq: null}}," + " {'o.diff.u.foo': {$eq: null}}," + " {'o.$set.foo': {$eq: null}}" + " ]}" + " ]}," + " {$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$exists: true}}" + " ]}," + " {op: {$not: {$eq: 'u'}}}" + " ]}" + "]}")); +} + TEST_F(ChangeStreamRewriteTest, - CannotRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) { + CanInexactlyRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': 'b'}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6554,6 +6875,39 @@ TEST_F(ChangeStreamRewriteTest, auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteEqPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': {$ne: 'b'}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, + CannotRewriteEqNullPredicateOnFieldUpdateDescriptionUpdatedFieldsFooBar) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.updatedFields.foo.bar': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } @@ -6580,8 +6934,59 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteStringEqPredicateOnFieldUpdateDescript "]}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$eq: null}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that this produces an {$alwaysFalse:1} predicate for update events. This will optimize + // away the enclosing $and so that only non-updates will be returned from the oplog scan. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysFalse: 1}" + " ]}," + " {$or: [" + " {$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$exists: true}}" + " ]}," + " {op: {$not: {$eq: 'u'}}}" + " ]}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteExistsPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$exists: true}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // Note that the {$alwaysTrue:1} predicate is an artefact of the rewrite process. It will be + // optimized away and will have no functional impact on the filter. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysTrue: 1}" + "]}")); +} + TEST_F(ChangeStreamRewriteTest, - CannotRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + CanInexactlyRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.removedFields': 'u.v'}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6589,12 +6994,32 @@ TEST_F(ChangeStreamRewriteTest, auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteDottedStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$ne: 'u.v'}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } TEST_F(ChangeStreamRewriteTest, - CannotRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + CanInexactlyRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.removedFields': ['z']}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6602,6 +7027,26 @@ TEST_F(ChangeStreamRewriteTest, auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteNonStringEqPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$ne: ['z']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } @@ -6645,11 +7090,16 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEmptyInPredicateOnFieldUpdateDescripti ASSERT(rewrittenMatchExpression); auto rewrittenPredicate = rewrittenMatchExpression->serialize(); - ASSERT_BSONOBJ_EQ(rewrittenPredicate, fromjson("{$alwaysFalse: 1}")); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}," + " {$alwaysFalse: 1}" + "]}")); } TEST_F(ChangeStreamRewriteTest, - CannotRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) { + CanInexactlyRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.removedFields': {$in: ['w', ['y']]}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6657,12 +7107,32 @@ TEST_F(ChangeStreamRewriteTest, auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteNonStringInPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$nin: ['w', ['y']]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } TEST_F(ChangeStreamRewriteTest, - CannotRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) { + CanInexactlyRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.removedFields': {$in: [/ab*c/, /de*f/]}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6670,12 +7140,32 @@ TEST_F(ChangeStreamRewriteTest, auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteRegexInPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$nin: [/ab*c/, /de*f/]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } TEST_F(ChangeStreamRewriteTest, - CannotRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) { + CanInexactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.removedFields': {$lt: 'z'}}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6683,11 +7173,65 @@ TEST_F(ChangeStreamRewriteTest, auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFields) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields': {$not: {$lt: 'z'}}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } -TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) { +TEST_F(ChangeStreamRewriteTest, + CanInexactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFieldsFoo) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields.foo': {$lt: 'z'}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewriteArbitraryPredicateOnFieldUpdateDescriptionRemovedFieldsFoo) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.removedFields.foo': {$not: {$lt: 'z'}}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + + ASSERT(rewrittenMatchExpression == nullptr); +} + +TEST_F(ChangeStreamRewriteTest, + CanInexactlyRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) { auto expCtx = getExpCtx(); auto expr = fromjson("{'updateDescription.truncatedArrays': []}"); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); @@ -6695,6 +7239,26 @@ TEST_F(ChangeStreamRewriteTest, CannotRewritePredicateOnFieldUpdateDescriptionTr auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); + ASSERT(rewrittenMatchExpression); + + // This produces a minimally selective filter which returns all non-replacement update events. + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [" + " {op: {$eq: 'u'}}," + " {'o._id': {$not: {$exists: true}}}" + "]}")); +} + +TEST_F(ChangeStreamRewriteTest, + CannotExactlyRewritePredicateOnFieldUpdateDescriptionTruncatedArrays) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'updateDescription.truncatedArrays': {$ne: []}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"updateDescription"}); ASSERT(rewrittenMatchExpression == nullptr); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index d7f646c8d10..8e51e29e7b9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -34,6 +34,7 @@ #include "mongo/db/pipeline/document_source_change_stream_unwind_transaction.h" #include "mongo/db/pipeline/change_stream_filter_helpers.h" +#include "mongo/db/pipeline/change_stream_rewrite_helpers.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/transaction_history_iterator.h" @@ -385,9 +386,10 @@ void DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator:: namespace change_stream_filter { /** - * Build a filter, similar to the optimized oplog filter, designed to reject oplog entries that we - * know would eventually get rejected by the 'userMatch' filter if they continued through the rest - * of the pipeline. + * Build a filter, similar to the optimized oplog filter, designed to reject individual transaction + * entries that we know would eventually get rejected by the 'userMatch' filter if they continued + * through the rest of the pipeline. We must also adjust the filter slightly for user rewrites, as + * events within a transaction do not have certain fields that are common to other oplog entries. * * NB: The new filter may contain references to strings in the BSONObj that 'userMatch' originated * from. Callers that keep the new filter long-term should serialize and re-parse it to guard @@ -395,11 +397,21 @@ namespace change_stream_filter { */ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) { - // The current implementation of the transaction filter is the same as the "operation filter" - // applied to oplog entries that can be filtered early (CRUD operations and non-invalidating - // commands). This filter includes a namespace filter, ensuring it will filter out all documents - // that would be filtered out by the default 'ns' filter this stage gets initialized with. - return change_stream_filter::buildOperationFilter(expCtx, userMatch); + // The transaction unwind filter is the same as the operation filter applied to the oplog. This + // includes a namespace filter, which ensures that it will discard all documents that would be + // filtered out by the default 'ns' filter this stage gets initialized with. + auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr)); + + // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do + // this separately because we need to exclude certain fields from the user's filters. Unwound + // transaction events do not have these fields until we populate them from the commitTransaction + // event. We already applied these predicates during the oplog scan, so we know that they match. + static const std::set<std::string> excludedFields = {"clusterTime", "lsid", "txnNumber"}; + if (auto rewrittenMatch = + change_stream_rewrite::rewriteFilterForFields(expCtx, userMatch, {}, excludedFields)) { + unwindFilter->add(std::move(rewrittenMatch)); + } + return MatchExpression::optimize(std::move(unwindFilter)); } } // namespace change_stream_filter diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index cf23b7735b6..79e39f865d9 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -61,7 +61,8 @@ public: FacetRequirement::kAllowed, TransactionRequirement::kAllowed, LookupRequirement::kAllowed, - UnionRequirement::kAllowed}; + UnionRequirement::kAllowed, + ChangeStreamRequirement::kAllowlist}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { |