diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2022-01-07 23:23:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-07 23:51:07 +0000 |
commit | 401fff7b836f0ad71d0e9e714a5801287dba8ce1 (patch) | |
tree | 6934d93693e6dd76b45791c4a6071f1804e38022 /jstests/change_streams | |
parent | 872e9856b0ef0216c2ecd21acb76f40ad881e9db (diff) | |
download | mongo-401fff7b836f0ad71d0e9e714a5801287dba8ce1.tar.gz |
SERVER-62081 Exhaustively test and fix change stream rewrites against null-equality predicates
Diffstat (limited to 'jstests/change_streams')
4 files changed, 341 insertions, 0 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js index 4a293da3290..a9a7bfc3455 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js @@ -135,6 +135,12 @@ for (const op of ["insert", "update", "replace", "delete"]) { [[op, 3, 0], [op, 3, 1]], [1, 1] /* expectedOplogCursorReturnedDocs */); + // Test out an {$eq: null} predicate on 'documentKey._id'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey._id": {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + // Test out a negated predicate on 'documentKey.shard'. It's not possible to rewrite this // predicate and make it part of the oplog filter, so we expect the oplog cursor to return 2 // docs on each shard. @@ -150,6 +156,13 @@ for (const op of ["insert", "update", "replace", "delete"]) { [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]], [2, 2] /* expectedOplogCursorReturnedDocs */); + // Test out the '{$eq: null}' predicate on a field that doesn't exist in 'documentKey' but that + // does exist in some of the underlying documents. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey.z": {$eq: null}}}, + [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + // Test out an $expr predicate on the full 'documentKey' field. verifyOps( resumeAfterToken, diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js index 61ce95e1e79..8f7262553de 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js @@ -141,6 +141,13 @@ verifyNonInvalidatingOps(resumeAfterToken, [] /* expectedOps */, 0 /* expectedOplogRetDocsForEachShard */); +// Ensure that the '$eq: null' on operation type sub-field can be rewritten to the oplog format. The +// oplog cursor should return all documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {"operationType.subField": {$eq: null}}}, + ["insert", "update", "replace", "delete"] /* expectedOps */, + 4 /* expectedOplogRetDocsForEachShard */); + // Ensure that the '$match' on the operation type with '$in' is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$in: ["insert", "update"]}}}, diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js index 13fbbad16b5..fcbd2f6b9c6 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js @@ -141,6 +141,26 @@ for (const reverseShards of [false, true]) { [[op, 2, 0]], [2, 2] /* expectedOplogCursorReturnedDocs */); + // Test out an $eq:null predicate on the full 'updateDescription' field. + verifyOps({$match: {operationType: op, updateDescription: {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on the full 'updateDescription' field. + verifyOps({$match: {operationType: op, updateDescription: {$exists: false}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out an $eq:null predicate on 'updateDescription.updatedFields'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on 'updateDescription.updatedFields'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$exists: false}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + // Test out an $eq predicate on 'updateDescription.updatedFields.f'. verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": "b"}}, [[op, 3, 0]], @@ -171,6 +191,16 @@ for (const reverseShards of [false, true]) { [[op, 3, 1]], [0, 1] /* expectedOplogCursorReturnedDocs */); + // Test out an $eq:null predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$eq: null}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $exists predicate on 'updateDescription.removedFields'. + verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$exists: false}}}, + [], + [0, 0] /* expectedOplogCursorReturnedDocs */); + // Test out a non-dotted string $eq predicate on 'updateDescription.removedFields'. verifyOps({$match: {operationType: op, "updateDescription.removedFields": "z"}}, [[op, 2, 0], [op, 2, 1]], @@ -218,6 +248,11 @@ for (const reverseShards of [false, true]) { [[op, 2, 0], [op, 3, 0], [op, 3, 1]], [2, 1] /* expectedOplogCursorReturnedDocs */); + // Test out an {$eq:null} predicate on 'updateDescription.updatedFields.g'. + verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": {$eq: null}}}, + [[op, 2, 0], [op, 3, 0], [op, 3, 1]], + [2, 1] /* expectedOplogCursorReturnedDocs */); + // Test out a negated $eq predicate on 'updateDescription.removedFields'. verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$not: {$eq: "z"}}}}, [[op, 3, 0], [op, 3, 1]], diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js new file mode 100644 index 00000000000..43df7cfd1bb --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js @@ -0,0 +1,286 @@ +/** + * Tests that change streams correctly handle rewrites of null, existence and equality checks, for + * both existent and non-existent fields and subfields. + * @tags: [ + * featureFlagChangeStreamsRewrite, + * requires_fcv_51, + * requires_pipeline_optimization, + * uses_change_streams + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. +load("jstests/libs/fixture_helpers.js"); // For isMongos. + +const dbName = "change_stream_rewrite_null_existence_test"; + +const collName = "coll1"; +const collNameAfterRename = "coll_renamed"; + +// Establish a resume token at a point before anything actually happens in the test. +const startPoint = db.getMongo().watch().getResumeToken(); + +// If this is a sharded passthrough, make sure we shard on something other than _id. +if (FixtureHelpers.isMongos(db)) { + assertDropCollection(db, collName); + assert.commandWorked(db.adminCommand({enableSharding: dbName})); + assert.commandWorked( + db.adminCommand({shardCollection: `${dbName}.${collName}`, key: {shardKey: "hashed"}})); +} + +const testDB = db.getSiblingDB(dbName); +let testColl = testDB[collName]; + +const numDocs = 8; + +// Generate the write workload. +(function performWriteWorkload() { + // Insert some documents. + for (let i = 0; i < numDocs; ++i) { + assert.commandWorked(testColl.insert( + {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false})); + } + + // Update half of them. We generate these updates individually so that they generate different + // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields. + const updateSpecs = [ + [{$set: {f2: true}}], // only populates 'updatedFields' + [{$unset: ["f1"]}], // only populates 'removedFields' + [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays' + [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields + ]; + for (let i = 0; i < numDocs / 2; ++i) { + assert.commandWorked( + testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)])); + } + + // Replace the other half. + for (let i = numDocs / 2; i < numDocs; ++i) { + assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i})); + } + + // Delete half of the updated documents. + for (let i = 0; i < numDocs / 4; ++i) { + assert.commandWorked(testColl.remove({_id: i, shardKey: i})); + } +})(); + +// Rename the collection. +assert.commandWorked(testColl.renameCollection(collNameAfterRename)); +testColl = testDB[collNameAfterRename]; + +// Drop the collection. +assert(testColl.drop()); + +// Drop the database. +assert.commandWorked(testDB.dropDatabase()); + +// Function to generate a list of all paths to be tested from those observed in the event stream. +function traverseEvent(event, outputMap, prefixPath = "") { + // The number of values to equality-test for each path. Set this to Infinity to test everything. + const maxValuesPerPath = 1; + + // Begin traversing through the event, adding paths and values into 'outputMap'. + for (let fieldName in event) { + const fieldPath = (prefixPath.length > 0 ? prefixPath + "." : "") + fieldName; + const fieldVal = event[fieldName]; + + // Create an entry for this field if it doesn't already exist. + if (!outputMap[fieldPath]) { + outputMap[fieldPath] = {extraValues: [], values: []}; + } + + // Add entries for each of the standard subfields that we test for every existent field. + for (let subField of standardSubFieldsToTest) { + const subFieldPathToAdd = fieldPath + "." + subField; + if (!outputMap[subFieldPathToAdd]) { + outputMap[subFieldPathToAdd] = {extraValues: [], values: []}; + } + } + + // Helper function to add a new value into the fields list. + function addToPredicatesList(fieldPath, fieldVal) { + const alreadyExists = + outputMap[fieldPath].values.some((elem) => friendlyEqual(elem, fieldVal)); + const numValues = outputMap[fieldPath].values.length; + if (!alreadyExists && numValues < maxValuesPerPath) { + outputMap[fieldPath].values.push(fieldVal); + } + } + + // Helper function to check whether this value is a plain old javascript object. + function isPlainObject(value) { + return (value && typeof (value) == "object" && value.constructor === Object); + } + + // Add a predicate on the full field, whether scalar, object, or array. + addToPredicatesList(fieldPath, fieldVal); + + // If the field is an object, traverse through it. + if (isPlainObject(fieldVal)) { + traverseEvent(fieldVal, outputMap, fieldPath); + } + + // If the field is an array, find any subobjects and traverse them. + if (Array.isArray(fieldVal)) { + for (let arrayElem of fieldVal) { + if (isPlainObject(arrayElem)) { + traverseEvent(arrayElem, outputMap, fieldPath); + } else { + addToPredicatesList(fieldPath, arrayElem); + } + } + // Traverse through the array itself as an object. This will descend into the array by + // index, allowing us to test fieldname-or-array-index matching semantics. + traverseEvent(fieldVal, outputMap, fieldPath); + } + } +} + +// Helper function to fully exhaust a change stream from the start point and return all events. +function getAllChangeStreamEvents(extraStages, csOptions) { + // Open a whole-cluster stream based on the supplied arguments. + const csCursor = testDB.getMongo().watch( + extraStages, Object.assign({resumeAfter: startPoint, maxAwaitTimeMS: 1}, csOptions)); + + // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will + // guarantee that all shards have returned results, and we expect all results to fit into a + // single batch, so we know we have exhausted the stream. + while (bsonWoCompare(csCursor._postBatchResumeToken, startPoint) == 0) { + csCursor.hasNext(); // runs a getMore + } + + // Close the cursor since we have already retrieved all results. + csCursor.close(); + + // Extract all events from the streams. Since the cursor is closed, it will not attempt to + // retrieve any more batches from the server. + return csCursor.toArray(); +} + +// Obtain a list of all events that occurred during the write workload. +const allEvents = getAllChangeStreamEvents([], {fullDocument: "updateLookup"}); + +jsTestLog(`All events: ${tojson(allEvents)}`); + +// List of specific fields and values that we wish to test. This will be populated during traversal +// of the events in the stream, but we can add further paths and extra values which will not appear +// in the stream but which we nonetheless want to test. Note that null and existence predicates will +// be tested for every field, and do not need to be explicitly specified here. The format of each +// entry is as follows: +// +// { +// "full.path.to.field": { +// extraValues: [special values we wish to test that do not appear in the stream], +// values: [automatically populated by examining the stream] +// } +// } +const fieldsToBeTested = { + // Test documentKey with a field that is in the full object but not in the documentKey. + "documentKey": {extraValues: [{f2: null, _id: 1}], values: []}, + "documentKey.f1": {extraValues: [{subField: true}], values: []} +}; + +// Always test these subfields for all parent fields. +const standardSubFieldsToTest = ["nonExistentField"]; + +// Traverse each event in the stream and build up a map of all field paths. +allEvents.forEach((event) => traverseEvent(event, fieldsToBeTested)); + +jsTestLog(`Final set of fields to test: ${tojson(fieldsToBeTested)}`); + +// Define the filters that we want to apply to each field. +function generateMatchFilters(fieldPath) { + const valuesToTest = + fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues); + + const filters = [ + {[fieldPath]: {$eq: null}}, + {[fieldPath]: {$ne: null}}, + {[fieldPath]: {$lte: null}}, + {[fieldPath]: {$gte: null}}, + {[fieldPath]: {$exists: true}}, + {[fieldPath]: {$exists: false}} + ]; + + for (let value of valuesToTest) { + filters.push({[fieldPath]: value}); + } + + return filters; +} +function generateExprFilters(fieldPath) { + const valuesToTest = + fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues); + + const exprFieldPath = "$" + fieldPath; + const exprs = [ + {$expr: {$eq: [exprFieldPath, null]}}, + {$expr: {$ne: [exprFieldPath, null]}}, + {$expr: {$lte: [exprFieldPath, null]}}, + {$expr: {$gte: [exprFieldPath, null]}}, + {$expr: {$eq: [exprFieldPath, "$$REMOVE"]}}, + {$expr: {$ne: [exprFieldPath, "$$REMOVE"]}}, + {$expr: {$lte: [exprFieldPath, "$$REMOVE"]}}, + {$expr: {$gte: [exprFieldPath, "$$REMOVE"]}} + ]; + + for (let value of valuesToTest) { + exprs.push({$expr: {$eq: [exprFieldPath, value]}}); + } + + return exprs; +} + +// Record all failed test cases to be reported at the end of the test. +const failedTestCases = []; + +// Confirm that the output of an optimized change stream matches an unoptimized stream. +for (let csConfig of [{fullDocument: "updateLookup"}]) { + for (let fieldToTest in fieldsToBeTested) { + const predicatesToTest = + generateMatchFilters(fieldToTest).concat(generateExprFilters(fieldToTest)); + for (let predicate of predicatesToTest) { + // Create a $match expression for the current predicate. + const matchExpr = {$match: predicate}; + + jsTestLog(`Testing filter ${tojsononeline(matchExpr)} with ${tojsononeline(csConfig)}`); + + // Construct one optimized pipeline, and one which inhibits optimization. + const nonOptimizedPipeline = [{$_internalInhibitOptimization: {}}, matchExpr]; + const optimizedPipeline = [matchExpr]; + + // Extract all results from each of the pipelines. + const nonOptimizedOutput = getAllChangeStreamEvents(nonOptimizedPipeline, csConfig); + const optimizedOutput = getAllChangeStreamEvents(optimizedPipeline, csConfig); + + // We never expect to see more optimized results than unoptimized. + assert(optimizedOutput.length <= nonOptimizedOutput.length, + {optimizedOutput: optimizedOutput, nonOptimizedOutput: nonOptimizedOutput}); + + // Check the unoptimized results against the optimized results. If we observe an entry + // in the non-optimized array that is not present in the optimized, add the details to + // 'failedTestCases' and continue. + for (let i = 0; i < nonOptimizedOutput.length; ++i) { + try { + assert(i < optimizedOutput.length); + assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i])); + } catch (error) { + failedTestCases.push({ + matchExpr: matchExpr, + csConfig: csConfig, + events: {nonOptimized: nonOptimizedOutput[i], optimized: optimizedOutput[i]} + }); + jsTestLog(`Total failures: ${failedTestCases.length}`); + break; + } + } + } + } +} + +// Assert that there were no failed test cases. +assert(failedTestCases.length == 0, failedTestCases); +})();
\ No newline at end of file |