summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-01-07 23:23:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-07 23:51:07 +0000
commit401fff7b836f0ad71d0e9e714a5801287dba8ce1 (patch)
tree6934d93693e6dd76b45791c4a6071f1804e38022 /jstests/change_streams
parent872e9856b0ef0216c2ecd21acb76f40ad881e9db (diff)
downloadmongo-401fff7b836f0ad71d0e9e714a5801287dba8ce1.tar.gz
SERVER-62081 Exhaustively test and fix change stream rewrites against null-equality predicates
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js13
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js7
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js35
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js286
4 files changed, 341 insertions, 0 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
index 4a293da3290..a9a7bfc3455 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
@@ -135,6 +135,12 @@ for (const op of ["insert", "update", "replace", "delete"]) {
[[op, 3, 0], [op, 3, 1]],
[1, 1] /* expectedOplogCursorReturnedDocs */);
+ // Test out an {$eq: null} predicate on 'documentKey._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey._id": {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
// Test out a negated predicate on 'documentKey.shard'. It's not possible to rewrite this
// predicate and make it part of the oplog filter, so we expect the oplog cursor to return 2
// docs on each shard.
@@ -150,6 +156,13 @@ for (const op of ["insert", "update", "replace", "delete"]) {
[[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
[2, 2] /* expectedOplogCursorReturnedDocs */);
+ // Test out the '{$eq: null}' predicate on a field that doesn't exist in 'documentKey' but that
+ // does exist in some of the underlying documents.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey.z": {$eq: null}}},
+ [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
// Test out an $expr predicate on the full 'documentKey' field.
verifyOps(
resumeAfterToken,
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
index 61ce95e1e79..8f7262553de 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
@@ -141,6 +141,13 @@ verifyNonInvalidatingOps(resumeAfterToken,
[] /* expectedOps */,
0 /* expectedOplogRetDocsForEachShard */);
+// Ensure that the '$eq: null' on operation type sub-field can be rewritten to the oplog format. The
+// oplog cursor should return all documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {"operationType.subField": {$eq: null}}},
+ ["insert", "update", "replace", "delete"] /* expectedOps */,
+ 4 /* expectedOplogRetDocsForEachShard */);
+
// Ensure that the '$match' on the operation type with '$in' is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$in: ["insert", "update"]}}},
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
index 13fbbad16b5..fcbd2f6b9c6 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_updateDescription_rewrite.js
@@ -141,6 +141,26 @@ for (const reverseShards of [false, true]) {
[[op, 2, 0]],
[2, 2] /* expectedOplogCursorReturnedDocs */);
+ // Test out an $eq:null predicate on the full 'updateDescription' field.
+ verifyOps({$match: {operationType: op, updateDescription: {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on the full 'updateDescription' field.
+ verifyOps({$match: {operationType: op, updateDescription: {$exists: false}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $eq:null predicate on 'updateDescription.updatedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on 'updateDescription.updatedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields": {$exists: false}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
// Test out an $eq predicate on 'updateDescription.updatedFields.f'.
verifyOps({$match: {operationType: op, "updateDescription.updatedFields.f": "b"}},
[[op, 3, 0]],
@@ -171,6 +191,16 @@ for (const reverseShards of [false, true]) {
[[op, 3, 1]],
[0, 1] /* expectedOplogCursorReturnedDocs */);
+ // Test out an $eq:null predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$eq: null}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $exists predicate on 'updateDescription.removedFields'.
+ verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$exists: false}}},
+ [],
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
// Test out a non-dotted string $eq predicate on 'updateDescription.removedFields'.
verifyOps({$match: {operationType: op, "updateDescription.removedFields": "z"}},
[[op, 2, 0], [op, 2, 1]],
@@ -218,6 +248,11 @@ for (const reverseShards of [false, true]) {
[[op, 2, 0], [op, 3, 0], [op, 3, 1]],
[2, 1] /* expectedOplogCursorReturnedDocs */);
+ // Test out an {$eq:null} predicate on 'updateDescription.updatedFields.g'.
+ verifyOps({$match: {operationType: op, "updateDescription.updatedFields.g": {$eq: null}}},
+ [[op, 2, 0], [op, 3, 0], [op, 3, 1]],
+ [2, 1] /* expectedOplogCursorReturnedDocs */);
+
// Test out a negated $eq predicate on 'updateDescription.removedFields'.
verifyOps({$match: {operationType: op, "updateDescription.removedFields": {$not: {$eq: "z"}}}},
[[op, 3, 0], [op, 3, 1]],
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
new file mode 100644
index 00000000000..43df7cfd1bb
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
@@ -0,0 +1,286 @@
+/**
+ * Tests that change streams correctly handle rewrites of null, existence and equality checks, for
+ * both existent and non-existent fields and subfields.
+ * @tags: [
+ * featureFlagChangeStreamsRewrite,
+ * requires_fcv_51,
+ * requires_pipeline_optimization,
+ * uses_change_streams
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+load("jstests/libs/fixture_helpers.js"); // For isMongos.
+
+const dbName = "change_stream_rewrite_null_existence_test";
+
+const collName = "coll1";
+const collNameAfterRename = "coll_renamed";
+
+// Establish a resume token at a point before anything actually happens in the test.
+const startPoint = db.getMongo().watch().getResumeToken();
+
+// If this is a sharded passthrough, make sure we shard on something other than _id.
+if (FixtureHelpers.isMongos(db)) {
+ assertDropCollection(db, collName);
+ assert.commandWorked(db.adminCommand({enableSharding: dbName}));
+ assert.commandWorked(
+ db.adminCommand({shardCollection: `${dbName}.${collName}`, key: {shardKey: "hashed"}}));
+}
+
+const testDB = db.getSiblingDB(dbName);
+let testColl = testDB[collName];
+
+const numDocs = 8;
+
+// Generate the write workload.
+(function performWriteWorkload() {
+ // Insert some documents.
+ for (let i = 0; i < numDocs; ++i) {
+ assert.commandWorked(testColl.insert(
+ {_id: i, shardKey: i, a: [1, [2], {b: 3}], f1: {subField: true}, f2: false}));
+ }
+
+ // Update half of them. We generate these updates individually so that they generate different
+ // values for the 'updatedFields', 'removedFields' and 'truncatedArrays' subfields.
+ const updateSpecs = [
+ [{$set: {f2: true}}], // only populates 'updatedFields'
+ [{$unset: ["f1"]}], // only populates 'removedFields'
+ [{$set: {a: [1, [2]]}}], // only populates 'truncatedArrays'
+ [{$set: {a: [1, [2]], f2: true}}, {$unset: ["f1"]}] // populates all fields
+ ];
+ for (let i = 0; i < numDocs / 2; ++i) {
+ assert.commandWorked(
+ testColl.update({_id: i, shardKey: i}, updateSpecs[(i % updateSpecs.length)]));
+ }
+
+ // Replace the other half.
+ for (let i = numDocs / 2; i < numDocs; ++i) {
+ assert.commandWorked(testColl.replaceOne({_id: i, shardKey: i}, {_id: i, shardKey: i}));
+ }
+
+ // Delete half of the updated documents.
+ for (let i = 0; i < numDocs / 4; ++i) {
+ assert.commandWorked(testColl.remove({_id: i, shardKey: i}));
+ }
+})();
+
+// Rename the collection.
+assert.commandWorked(testColl.renameCollection(collNameAfterRename));
+testColl = testDB[collNameAfterRename];
+
+// Drop the collection.
+assert(testColl.drop());
+
+// Drop the database.
+assert.commandWorked(testDB.dropDatabase());
+
+// Function to generate a list of all paths to be tested from those observed in the event stream.
+function traverseEvent(event, outputMap, prefixPath = "") {
+ // The number of values to equality-test for each path. Set this to Infinity to test everything.
+ const maxValuesPerPath = 1;
+
+ // Begin traversing through the event, adding paths and values into 'outputMap'.
+ for (let fieldName in event) {
+ const fieldPath = (prefixPath.length > 0 ? prefixPath + "." : "") + fieldName;
+ const fieldVal = event[fieldName];
+
+ // Create an entry for this field if it doesn't already exist.
+ if (!outputMap[fieldPath]) {
+ outputMap[fieldPath] = {extraValues: [], values: []};
+ }
+
+ // Add entries for each of the standard subfields that we test for every existent field.
+ for (let subField of standardSubFieldsToTest) {
+ const subFieldPathToAdd = fieldPath + "." + subField;
+ if (!outputMap[subFieldPathToAdd]) {
+ outputMap[subFieldPathToAdd] = {extraValues: [], values: []};
+ }
+ }
+
+ // Helper function to add a new value into the fields list.
+ function addToPredicatesList(fieldPath, fieldVal) {
+ const alreadyExists =
+ outputMap[fieldPath].values.some((elem) => friendlyEqual(elem, fieldVal));
+ const numValues = outputMap[fieldPath].values.length;
+ if (!alreadyExists && numValues < maxValuesPerPath) {
+ outputMap[fieldPath].values.push(fieldVal);
+ }
+ }
+
+ // Helper function to check whether this value is a plain old javascript object.
+ function isPlainObject(value) {
+ return (value && typeof (value) == "object" && value.constructor === Object);
+ }
+
+ // Add a predicate on the full field, whether scalar, object, or array.
+ addToPredicatesList(fieldPath, fieldVal);
+
+ // If the field is an object, traverse through it.
+ if (isPlainObject(fieldVal)) {
+ traverseEvent(fieldVal, outputMap, fieldPath);
+ }
+
+ // If the field is an array, find any subobjects and traverse them.
+ if (Array.isArray(fieldVal)) {
+ for (let arrayElem of fieldVal) {
+ if (isPlainObject(arrayElem)) {
+ traverseEvent(arrayElem, outputMap, fieldPath);
+ } else {
+ addToPredicatesList(fieldPath, arrayElem);
+ }
+ }
+ // Traverse through the array itself as an object. This will descend into the array by
+ // index, allowing us to test fieldname-or-array-index matching semantics.
+ traverseEvent(fieldVal, outputMap, fieldPath);
+ }
+ }
+}
+
+// Helper function to fully exhaust a change stream from the start point and return all events.
+function getAllChangeStreamEvents(extraStages, csOptions) {
+ // Open a whole-cluster stream based on the supplied arguments.
+ const csCursor = testDB.getMongo().watch(
+ extraStages, Object.assign({resumeAfter: startPoint, maxAwaitTimeMS: 1}, csOptions));
+
+ // Run getMore until the post-batch resume token advances. In a sharded passthrough, this will
+ // guarantee that all shards have returned results, and we expect all results to fit into a
+ // single batch, so we know we have exhausted the stream.
+ while (bsonWoCompare(csCursor._postBatchResumeToken, startPoint) == 0) {
+ csCursor.hasNext(); // runs a getMore
+ }
+
+ // Close the cursor since we have already retrieved all results.
+ csCursor.close();
+
+ // Extract all events from the streams. Since the cursor is closed, it will not attempt to
+ // retrieve any more batches from the server.
+ return csCursor.toArray();
+}
+
+// Obtain a list of all events that occurred during the write workload.
+const allEvents = getAllChangeStreamEvents([], {fullDocument: "updateLookup"});
+
+jsTestLog(`All events: ${tojson(allEvents)}`);
+
+// List of specific fields and values that we wish to test. This will be populated during traversal
+// of the events in the stream, but we can add further paths and extra values which will not appear
+// in the stream but which we nonetheless want to test. Note that null and existence predicates will
+// be tested for every field, and do not need to be explicitly specified here. The format of each
+// entry is as follows:
+//
+// {
+// "full.path.to.field": {
+// extraValues: [special values we wish to test that do not appear in the stream],
+// values: [automatically populated by examining the stream]
+// }
+// }
+const fieldsToBeTested = {
+ // Test documentKey with a field that is in the full object but not in the documentKey.
+ "documentKey": {extraValues: [{f2: null, _id: 1}], values: []},
+ "documentKey.f1": {extraValues: [{subField: true}], values: []}
+};
+
+// Always test these subfields for all parent fields.
+const standardSubFieldsToTest = ["nonExistentField"];
+
+// Traverse each event in the stream and build up a map of all field paths.
+allEvents.forEach((event) => traverseEvent(event, fieldsToBeTested));
+
+jsTestLog(`Final set of fields to test: ${tojson(fieldsToBeTested)}`);
+
+// Define the filters that we want to apply to each field.
+function generateMatchFilters(fieldPath) {
+ const valuesToTest =
+ fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues);
+
+ const filters = [
+ {[fieldPath]: {$eq: null}},
+ {[fieldPath]: {$ne: null}},
+ {[fieldPath]: {$lte: null}},
+ {[fieldPath]: {$gte: null}},
+ {[fieldPath]: {$exists: true}},
+ {[fieldPath]: {$exists: false}}
+ ];
+
+ for (let value of valuesToTest) {
+ filters.push({[fieldPath]: value});
+ }
+
+ return filters;
+}
+function generateExprFilters(fieldPath) {
+ const valuesToTest =
+ fieldsToBeTested[fieldPath].values.concat(fieldsToBeTested[fieldPath].extraValues);
+
+ const exprFieldPath = "$" + fieldPath;
+ const exprs = [
+ {$expr: {$eq: [exprFieldPath, null]}},
+ {$expr: {$ne: [exprFieldPath, null]}},
+ {$expr: {$lte: [exprFieldPath, null]}},
+ {$expr: {$gte: [exprFieldPath, null]}},
+ {$expr: {$eq: [exprFieldPath, "$$REMOVE"]}},
+ {$expr: {$ne: [exprFieldPath, "$$REMOVE"]}},
+ {$expr: {$lte: [exprFieldPath, "$$REMOVE"]}},
+ {$expr: {$gte: [exprFieldPath, "$$REMOVE"]}}
+ ];
+
+ for (let value of valuesToTest) {
+ exprs.push({$expr: {$eq: [exprFieldPath, value]}});
+ }
+
+ return exprs;
+}
+
+// Record all failed test cases to be reported at the end of the test.
+const failedTestCases = [];
+
+// Confirm that the output of an optimized change stream matches an unoptimized stream.
+for (let csConfig of [{fullDocument: "updateLookup"}]) {
+ for (let fieldToTest in fieldsToBeTested) {
+ const predicatesToTest =
+ generateMatchFilters(fieldToTest).concat(generateExprFilters(fieldToTest));
+ for (let predicate of predicatesToTest) {
+ // Create a $match expression for the current predicate.
+ const matchExpr = {$match: predicate};
+
+ jsTestLog(`Testing filter ${tojsononeline(matchExpr)} with ${tojsononeline(csConfig)}`);
+
+ // Construct one optimized pipeline, and one which inhibits optimization.
+ const nonOptimizedPipeline = [{$_internalInhibitOptimization: {}}, matchExpr];
+ const optimizedPipeline = [matchExpr];
+
+ // Extract all results from each of the pipelines.
+ const nonOptimizedOutput = getAllChangeStreamEvents(nonOptimizedPipeline, csConfig);
+ const optimizedOutput = getAllChangeStreamEvents(optimizedPipeline, csConfig);
+
+ // We never expect to see more optimized results than unoptimized.
+ assert(optimizedOutput.length <= nonOptimizedOutput.length,
+ {optimizedOutput: optimizedOutput, nonOptimizedOutput: nonOptimizedOutput});
+
+ // Check the unoptimized results against the optimized results. If we observe an entry
+ // in the non-optimized array that is not present in the optimized, add the details to
+ // 'failedTestCases' and continue.
+ for (let i = 0; i < nonOptimizedOutput.length; ++i) {
+ try {
+ assert(i < optimizedOutput.length);
+ assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i]));
+ } catch (error) {
+ failedTestCases.push({
+ matchExpr: matchExpr,
+ csConfig: csConfig,
+ events: {nonOptimized: nonOptimizedOutput[i], optimized: optimizedOutput[i]}
+ });
+ jsTestLog(`Total failures: ${failedTestCases.length}`);
+ break;
+ }
+ }
+ }
+ }
+}
+
+// Assert that there were no failed test cases.
+assert(failedTestCases.length == 0, failedTestCases);
+})(); \ No newline at end of file