diff options
5 files changed, 1824 insertions, 55 deletions
diff --git a/jstests/sharding/change_stream_match_pushdown_and_rewrite.js b/jstests/sharding/change_stream_match_pushdown_and_rewrite.js index 793653bb737..3714c9077f8 100644 --- a/jstests/sharding/change_stream_match_pushdown_and_rewrite.js +++ b/jstests/sharding/change_stream_match_pushdown_and_rewrite.js @@ -13,7 +13,7 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. const dbName = "change_stream_match_pushdown_and_rewrite"; -const collName = "change_stream_match_pushdown_and_rewrite"; +const collName = "coll1"; const collNameAlternate = "change_stream_match_pushdown_and_rewrite_alternate"; const st = new ShardingTest({ @@ -49,7 +49,7 @@ function getOplogExecutionStatsForShard(stats, shardName) { } // Returns a newly created sharded collection, where shard key is '_id'. -const coll = (() => { +function createShardedCollection(collName, splitAt) { assertDropAndRecreateCollection(db, collName); const coll = db.getCollection(collName); @@ -57,16 +57,19 @@ const coll = (() => { st.ensurePrimaryShard(dbName, st.shard0.shardName); - // Shard the test collection and split it into two chunks: one that contains all {_id: 1} - // documents and one that contains all {_id: 2} documents. + // Shard the test collection and split it into two chunks: one that contains all {_id: <lt + // splitAt>} documents and one that contains all {_id: <gte splitAt>} documents. st.shardColl(collName, {_id: 1} /* shard key */, - {_id: 2} /* split at */, - {_id: 2} /* move the chunk containing {_id: 2} to its own shard */, + {_id: splitAt} /* split at */, + {_id: splitAt} /* move the chunk containing {_id: splitAt} to its own shard */, dbName, true); return coll; -})(); +} + +// Create a sharded collection. +const coll = createShardedCollection(collName, 2 /* splitAt */); // Create a second (unsharded) test collection for validating transactions that insert into multiple // collections. @@ -251,7 +254,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); userMatchExpr, aggregateComment, expectedOps, - expectedOplogCursorReturnedDocs) => { + expectedOplogRetDocsForEachShard) => { const cursor = coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr], {comment: aggregateComment}); @@ -279,10 +282,10 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {comment: aggregateComment}); const execStatsShard0 = getOplogExecutionStatsForShard(stats, st.rs0.name); - assert.eq(execStatsShard0.nReturned, expectedOplogCursorReturnedDocs, execStatsShard0); + assert.eq(execStatsShard0.nReturned, expectedOplogRetDocsForEachShard, execStatsShard0); const execStatsShard1 = getOplogExecutionStatsForShard(stats, st.rs1.name); - assert.eq(execStatsShard1.nReturned, expectedOplogCursorReturnedDocs, execStatsShard1); + assert.eq(execStatsShard1.nReturned, expectedOplogRetDocsForEachShard, execStatsShard1); }; // Open a change stream and store the resume token. This resume token will be used to replay the @@ -305,35 +308,35 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: "insert"}}, "change_stream_rewritten_insert_op_type", ["insert"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on the 'update' operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: "update"}}, "change_stream_rewritten_update_op_type", ["update"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on the 'replace' operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: "replace"}}, "change_stream_rewritten_replace_op_type", ["replace"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on the 'delete' operation type is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: "delete"}}, "change_stream_rewritten_delete_op_type", ["delete"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on the operation type as number is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: 1}}, "change_stream_rewritten_invalid_number_op_type", [] /* expectedOps */, - 0 /* expectedOplogCursorReturnedDocs */); + 0 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on an unknown operation type cannot be rewritten to the oplog // format. The oplog cursor should return '4' documents. @@ -341,7 +344,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: "unknown"}}, "change_stream_rewritten_unknown_op_type", [] /* expectedOps */, - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on an empty string operation type cannot be rewritten to the oplog // format. The oplog cursor should return '4' documents for each shard. @@ -349,7 +352,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: ""}}, "change_stream_rewritten_empty_op_type", [] /* expectedOps */, - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on operation type with inequality operator cannot be rewritten to // the oplog format. The oplog cursor should return '4' documents for each shard. @@ -357,7 +360,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: {$gt: "insert"}}}, "change_stream_rewritten_inequality_op_type", ["update", "replace"], - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on operation type sub-field can be rewritten to // the oplog format. The oplog cursor should return '0' documents for each shard. @@ -365,14 +368,14 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {"operationType.subField": "subOperation"}}, "change_stream_rewritten_sub_op_type", [] /* expectedOps */, - 0 /* expectedOplogCursorReturnedDocs */); + 0 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on the operation type with '$in' is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$in: ["insert", "update"]}}}, "change_stream_rewritten_op_type_with_in_expr", ["insert", "update"], - 2 /* expectedOplogCursorReturnedDocs */); + 2 /* expectedOplogRetDocsForEachShard */); // Ensure that for the '$in' with one valid and one invalid operation type, rewrite to the // oplog format will be abandoned. The oplog cursor should return '4' documents for each shard. @@ -381,7 +384,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: {$in: ["insert", "unknown"]}}}, "change_stream_rewritten_op_type_with_in_expr_with_one_invalid_op_type", ["insert"], - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' with '$in' on an unknown operation type cannot be rewritten. The // oplog cursor should return '4' documents for each shard. @@ -389,14 +392,14 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: {$in: ["unknown"]}}}, "change_stream_rewritten_op_type_with_in_expr_with_unknown_op_type", [] /* expectedOps */, - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' with '$in' with operation type as number is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$in: [1]}}}, "change_stream_rewritten_op_type_with_in_expr_with_number_op_type", [] /* expectedOps */, - 0 /* expectedOplogCursorReturnedDocs */); + 0 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' with '$in' with operation type as a string and a regex cannot be // rewritten. The oplog cursor should return '4' documents for each shard. @@ -405,14 +408,14 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: {$in: [/^insert$/, "update"]}}}, "change_stream_rewritten_op_type_with_in_expr_with_string_and_regex_op_type", ["insert", "update"] /* expectedOps */, - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' on the operation type with '$nin' is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$nin: ["insert"]}}}, "change_stream_rewritten_op_type_with_nin_expr", ["update", "replace", "delete"], - 3 /* expectedOplogCursorReturnedDocs */); + 3 /* expectedOplogRetDocsForEachShard */); // Ensure that for the '$nin' with one valid and one invalid operation type, rewrite to the // oplog format will be abandoned. The oplog cursor should return '4' documents for each shard. @@ -421,14 +424,14 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {operationType: {$nin: ["insert", "unknown"]}}}, "change_stream_rewritten_op_type_with_nin_expr_with_one_invalid_op_type", ["update", "replace", "delete"], - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' with '$nin' with operation type as number is rewritten correctly. verifyNonInvalidatingOps(resumeAfterToken, {$match: {operationType: {$nin: [1]}}}, "change_stream_rewritten_op_type_with_nin_expr_with_number_op_type", ["insert", "update", "replace", "delete"], - 4 /* expectedOplogCursorReturnedDocs */); + 4 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' to match only 'insert' operations is rewritten // correctly. @@ -436,7 +439,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {$expr: {$eq: ["$operationType", "insert"]}}}, "change_stream_rewritten_op_type_eq_insert_in_expr", ["insert"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' to match only 'update' operations is rewritten // correctly. @@ -444,7 +447,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {$expr: {$eq: ["$operationType", "update"]}}}, "change_stream_rewritten_op_type_eq_update_in_expr", ["update"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' to match only 'replace' operations is rewritten // correctly. @@ -452,7 +455,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {$expr: {$eq: ["$operationType", "replace"]}}}, "change_stream_rewritten_op_type_eq_replace_in_expr", ["replace"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' to match only 'delete' operations is rewritten // correctly. @@ -460,7 +463,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {$expr: {$eq: ["$operationType", "delete"]}}}, "change_stream_rewritten_op_type_eq_delete_in_expr", ["delete"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' is rewritten correctly when comparing with 'unknown' // operation type. @@ -468,7 +471,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {$expr: {$eq: ["$operationType", "unknown"]}}}, "change_stream_rewritten_op_type_eq_unknown_in_expr", [] /* expectedOps */, - 0 /* expectedOplogCursorReturnedDocs */); + 0 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' is rewritten correctly when '$and' is in the // expression. @@ -485,7 +488,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); }, "change_stream_rewritten_op_type_in_expr_with_and", ["delete"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' is rewritten correctly when '$or' is in the // expression. @@ -502,7 +505,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); }, "change_stream_rewritten_op_type_in_expr_with_or", ["update", "replace", "delete"], - 3 /* expectedOplogCursorReturnedDocs */); + 3 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' is rewritten correctly when '$not' is in the // expression. @@ -511,7 +514,7 @@ assert.commandWorked(db.createCollection(collNameAlternate)); {$match: {$expr: {$not: {$regexMatch: {input: "$operationType", regex: /e$/}}}}}, "change_stream_rewritten_op_type_in_expr_with_not", ["insert"], - 1 /* expectedOplogCursorReturnedDocs */); + 1 /* expectedOplogRetDocsForEachShard */); // Ensure that the '$match' using '$expr' is rewritten correctly when nor ({$not: {$or: [...]}}) // is in the expression. @@ -530,7 +533,593 @@ assert.commandWorked(db.createCollection(collNameAlternate)); }, "change_stream_rewritten_op_type_in_expr_with_nor", ["update", "replace"], - 2 /* expectedOplogCursorReturnedDocs */); + 2 /* expectedOplogRetDocsForEachShard */); +})(); + +(function testNamespaceRewrites() { + // A helper that opens a change stream on the whole cluster with the user supplied match + // expression 'userMatchExpr' and validates that: + // 1. for each shard, the events are seen in that order as specified in 'expectedResult' + // 2. the filtering is been done at oplog level + const verifyOnWholeCluster = (resumeAfterToken, + userMatchExpr, + expectedResult, + expectedOplogRetDocsForEachShard) => { + const cursor = db.getSiblingDB("admin").aggregate([ + {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}}, + userMatchExpr + ]); + + for (const [collOrDb, opDict] of Object.entries(expectedResult)) { + for (const [op, eventIdentifierList] of Object.entries(opDict)) { + eventIdentifierList.forEach(eventIdentifier => { + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + assert.eq(event.operationType, op, event); + + if (op == "dropDatabase") { + assert.eq(event.ns.db, eventIdentifier, event); + } else if (op == "insert") { + assert.eq(event.documentKey._id, eventIdentifier, event); + } else if (op == "rename") { + assert.eq(event.to.coll, eventIdentifier, event); + } else if (op == "drop") { + assert.eq(event.ns.coll, eventIdentifier); + } else { + assert(false, event); + } + + if (op != "dropDatabase") { + assert.eq(event.ns.coll, collOrDb); + } + }); + } + } + + assert(!cursor.hasNext()); + + const stats = db.getSiblingDB("admin").runCommand({ + explain: { + aggregate: 1, + pipeline: [ + {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}}, + userMatchExpr + ], + cursor: {batchSize: 0} + }, + verbosity: "executionStats" + }); + + const execStatsShard0 = getOplogExecutionStatsForShard(stats, st.rs0.name); + assert.eq(execStatsShard0.nReturned, expectedOplogRetDocsForEachShard, execStatsShard0); + + const execStatsShard1 = getOplogExecutionStatsForShard(stats, st.rs1.name); + assert.eq(execStatsShard1.nReturned, expectedOplogRetDocsForEachShard, execStatsShard1); + }; + + // Create some new collections to ensure that test cases has sufficient namespaces to verify + // that the namespace filtering is working correctly. + const coll2 = createShardedCollection("coll2", 4 /* splitAt */); + const coll3 = createShardedCollection("coll.coll3", 6 /* splitAt */); + const coll4 = createShardedCollection("coll4", 10 /* splitAt */); + + // Open a change stream and store the resume token. This resume token will be used to replay the + // stream after this point. + const resumeAfterToken = + db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken(); + + // For each collection, insert 2 documents, one on each shard. These will create oplog events + // and change stream will apply various namespace filtering on these collections to verify that + // the namespace is rewritten correctly. Each documents also contains field names matching with + // that of '$cmd' operations, ie. 'renameCollection', 'drop' and 'dropDatabase', but with + // value-type other than strings. The 'ns' match filters should gracefully handle the type + // mismatch and not throw any error. + // Each of these inserted documents will be represented in this form in the oplog: + // {... "o": {"_id": <id>, "renameCollection": true, "drop": {}, "dropDatabase": null}, ...} + // A few collections are renamed and dropped to verify that these are filtered properly. + assert.commandWorked( + coll.insert({_id: 1, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked( + coll.insert({_id: 2, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked( + coll2.insert({_id: 3, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked( + coll2.insert({_id: 4, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked(coll2.renameCollection("newColl2")); + assert.commandWorked( + coll3.insert({_id: 5, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked( + coll3.insert({_id: 6, renameCollection: true, drop: {}, dropDatabase: null})); + assert(coll3.drop()); + + // Insert some documents into 'coll4' with field names which match known command types. Despite + // the fact that these documents could potentially match with the partial 'ns' filter we rewrite + // into the oplog, the {op: "c"} predicate we add into the filter should ensure that they are + // correctly discarded. + assert.commandWorked(coll4.insert( + {_id: 7, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1})); + assert.commandWorked( + coll4.insert({_id: 8, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked( + coll4.insert({_id: 9, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""})); + assert.commandWorked(coll4.insert( + {_id: 10, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1})); + assert.commandWorked( + coll4.insert({_id: 11, renameCollection: true, drop: {}, dropDatabase: null})); + assert.commandWorked( + coll4.insert({_id: 12, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""})); + + // This group of tests ensures that the '$match' on a particular namespace object only sees its + // documents and only required document(s) are returned at the oplog for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll1"}}}, + {coll1: {insert: [1, 2]}}, + 1 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll2"}}}, + {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll.coll3"}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on the namespace with only db component should not emit any + // document and the oplog should not return any documents. + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the namespace object with 'unknown' collection does not exists and the oplog + // cursor returns 0 document. + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "unknown"}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the namespace object with flipped fields does not match with the namespace object + // and the oplog cursor returns 0 document. + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {coll: "coll1", db: dbName}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the namespace object with extra fields does not match with the namespace object + // and the oplog cursor returns 0 document. + verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll1", extra: "extra"}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the empty namespace object does not match with the namespace object and the oplog + // cursor returns 0 document. + verifyOnWholeCluster(resumeAfterToken, {$match: {ns: {}}}, {}, 0); + + // Ensure the '$match' on namespace's db should return documents for all collection and oplog + // should return all documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": dbName}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // These cases ensure that the '$match' on regex of namespace' db, should return documents for + // all collection and oplog should return all documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^change_stream_match_pushdown.*$/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^(change_stream_match_pushdown.*$)/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /(^unknown$|^change_stream_match_pushdown.*$)/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^unknown$|^change_stream_match_pushdown.*$/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on non-existing db should not return any document and oplog should + // not return any document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": "unknown"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on empty db should not return any document and oplog should not + // return any document for each shard. + verifyOnWholeCluster( + resumeAfterToken, {$match: {"ns.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on sub field of db should not return any document and oplog should + // not return any document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db.extra": dbName}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // This group of tests ensures that the '$match' on collection field path should emit only the + // required documents and oplog should return only required document(s) for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "coll1"}}, + {coll1: {insert: [1, 2]}}, + 1 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "coll2"}}, + {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "coll.coll3"}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // This group of tests ensures that the '$match' on the regex of the collection field path + // should emit only the required documents and oplog should return only required document(s) for + // each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*1/}}, + {coll1: {insert: [1, 2]}}, + 1 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*2/}}, + {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*3/}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // This group of tests ensures that the '$match' on the regex matching all collections should + // return documents from all collection and oplog should return all document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^CoLL.*/i}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on the regex matching 3 collection should return documents from + // these collections and oplog should return required documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*1$|^col.*2$|^col.*3$/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]} + }, + 5 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on the regex to exclude 'coll1', 'coll2' and 'coll4' should return + // only documents from 'coll.coll3' and oplog should return required documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^coll[^124]/}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on non-existing collection should not return any document and oplog + // should not return any document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "unknown"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on empty collection should not return any document and oplog should + // not return any document for each shard. + verifyOnWholeCluster( + resumeAfterToken, {$match: {"ns.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that the '$match' on sub field of collection should not return any document and oplog + // should not return any document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll.extra": "coll1"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' on db should return all documents and oplog should return all documents for + // each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [dbName]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // This group of tests ensures that '$in' on regex matching the db name should return all + // documents and oplog should return all documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [/^change_stream_match.*$/]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [/^change_stream_MATCH.*$/i]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that an empty '$in' on db path should not match any collection and oplog should not + // return any document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: []}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents + // for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [dbName, 1]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should + // return '0' document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: ["unknown1", /^unknown2$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' on multiple collections should return the required documents and oplog + // should return required documents for each shard. + verifyOnWholeCluster( + resumeAfterToken, + {$match: {"ns": {$in: [{db: dbName, coll: "coll1"}, {db: dbName, coll: "coll2"}]}}}, + {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 3 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster( + resumeAfterToken, + {$match: {"ns.coll": {$in: ["coll1", "coll2"]}}}, + {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 3 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' on regex of multiple collections should return the required documents and + // oplog should return required documents for each shard. + verifyOnWholeCluster( + resumeAfterToken, + {$match: {"ns.coll": {$in: [/^coll1$/, /^coll2$/]}}}, + {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 3 /* expectedOplogRetDocsForEachShard */); + + // This group of tests ensures that '$in' on regex of matching all collections should return all + // documents and oplog should return all documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: [/^coll.*$/]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: [/^COLL.*$/i]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that an empty '$in' should not match any collection and oplog should not return any + // document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: []}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' with invalid collection cannot be rewritten and oplog should return all + // documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: ["coll1", 1]}}}, + {coll1: {insert: [1, 2]}}, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' with mix of string and regex matching collections can be rewritten and + // oplog should return required documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: ["coll1", /^coll.*3$/]}}}, + { + coll1: {insert: [1, 2]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + }, + 3 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$in' with mix of string and regex can be rewritten and oplog should + // return '0' document for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: ["unknown1", /^unknown2$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // This group of tests ensure that '$nin' on db path should return all documents and oplog + // should return all documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: []}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: ["unknown"]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: [/^unknown$/]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // These group of tests ensure that '$nin' on matching db name should not return any documents + // and oplog should return '0' documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: [dbName]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$nin' on multiple collections should return the required documents and oplog + // should return required documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: ["coll1", "coll2", "coll4"]}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$nin' on regex of multiple collections should return the required documents and + // oplog should return required documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: [/^coll1$/, /^coll2$/, /^coll4$/]}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$nin' on regex of matching all collections should not return any document and + // oplog should return '0' documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: [/^coll.*$/, /^sys.*$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + + // Ensure that an empty '$nin' should match all collections and oplog should return all + // documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: []}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$nin' with invalid collection cannot be rewritten and oplog should + // return all documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: ["coll1", 1]}}}, + { + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + coll4: {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + + // Ensure that '$nin' with mix of string and regex can be rewritten and oplog should + // return required documents for each shard. + verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: ["coll1", /^coll2$/, "coll4"]}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + + // At this stage, the coll2 has been renamed to 'newColl2' and coll3 has been dropped. The test + // from here will drop the database and ensure that the 'ns' filter when applied over the + // collection should only emit the 'drop' event for that collection and not the 'dropDatabase' + // event. It should be noted that for 'newColl2' and 'coll3', the 'dropDatabase' will be no-op + // and will not emit any documents. + + // Open a new change streams and verify that from here onwards the events related to + // 'dropDatabase' are seen. + const secondResumeAfterToken = + db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken(); + + assert.commandWorked(db.dropDatabase()); + + // This group of tests ensures that the match on 'coll1' only sees the 'drop' events. + verifyOnWholeCluster(secondResumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll1"}}}, + {coll1: {drop: ["coll1", "coll1"]}}, + 1 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(secondResumeAfterToken, + {$match: {"ns.coll": "coll1"}}, + {coll1: {drop: ["coll1", "coll1"]}}, + 1 /* expectedOplogRetDocsForEachShard */); + verifyOnWholeCluster(secondResumeAfterToken, + {$match: {"ns.coll": /^col.*1/}}, + {coll1: {drop: ["coll1", "coll1"]}}, + 1 /* expectedOplogRetDocsForEachShard */); + + verifyOnWholeCluster( + secondResumeAfterToken, + {$match: {ns: {db: dbName}}}, + {change_stream_match_pushdown_and_rewrite_and_rewrite: {dropDatabase: [dbName, dbName]}}, + 1 /* expectedOplogRetDocsForEachShard */); })(); st.stop(); diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp index af296aefed7..7bba41897d7 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp @@ -377,6 +377,343 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument( return rewrittenPredicate; } +// Helper to rewrite predicates on any change stream namespace field of the form {db: "dbName", +// coll: "collName"} into the oplog. + +// - By default, the rewrite is performed onto the given 'nsField' which specifies an oplog field +// containing a complete namespace string, e.g. {ns: "dbName.collName"}. +// - If 'nsFieldIsCmdNs' is true, then 'nsField' only contains the command-namespace of the +// database, i.e. "dbName.$cmd". +// - With 'nsFieldIsCmdNs set to true, the caller can also optionally provide 'collNameField' which +// is the field containing the collection name. The 'collNameField' may be absent, which means +// that the operation being rewritten has a 'db' field in the change stream event, but no 'coll' +// field. +std::unique_ptr<MatchExpression> matchRewriteGenericNamespace( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const PathMatchExpression* predicate, + StringData nsField, + bool nsFieldIsCmdNs = false, + boost::optional<StringData> collNameField = boost::none) { + // A collection name can only be specified with 'nsFieldIsCmdNs' set to true. + tassert(5554100, + "Cannot specify 'collNameField' with 'nsFieldIsCmdNs' set to false", + !(!nsFieldIsCmdNs && collNameField)); + + // Performs a rewrite based on the type of argument specified in the MatchExpression. + auto getRewrittenNamespace = [&](auto&& nsElem) -> std::unique_ptr<MatchExpression> { + switch (nsElem.type()) { + case BSONType::Object: { + // Handles case with full namespace object, like '{ns: {db: "db", coll: "coll"}}'. + // There must be a single part to the field path, ie. 'ns'. + if (predicate->fieldRef()->numParts() > 1) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // Extract the object from the RHS of the predicate. + auto nsObj = nsElem.embeddedObject(); + + // If a full namespace, or a collNameField were specified, there must be 2 fields in + // the object, i.e. db and coll. + if ((!nsFieldIsCmdNs || collNameField) && nsObj.nFields() != 2) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + // Otherwise, there can only be 1 field in the object, i.e. db. + if (nsFieldIsCmdNs && !collNameField && nsObj.nFields() != 1) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // Extract the db and collection from the 'ns' object. The 'collElem' will point to + // the eoo, if it is not present. + BSONObjIterator iter{nsObj}; + auto dbElem = iter.next(); + auto collElem = iter.next(); + + // Verify that the first field is 'db' and is of type string. We should always have + // a db entry no matter what oplog fields we are operating on. + if (dbElem.fieldNameStringData() != "db" || dbElem.type() != BSONType::String) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + // Verify that the second field is 'coll' and is of type string, if it exists. + if (collElem && + (collElem.fieldNameStringData() != "coll" || + collElem.type() != BSONType::String)) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + if (nsFieldIsCmdNs) { + auto rewrittenPred = std::make_unique<AndMatchExpression>(); + rewrittenPred->add(std::make_unique<EqualityMatchExpression>( + nsField, Value(dbElem.str() + ".$cmd"))); + + if (collNameField) { + // If we are rewriting to a combination of cmdNs and collName, we match on + // both. + rewrittenPred->add(std::make_unique<EqualityMatchExpression>( + *collNameField, Value(collElem.str()))); + } + return rewrittenPred; + } + + // Otherwise, we are rewriting to a full namespace field. Convert the object's + // subfields into an exact match on the oplog field. + return std::make_unique<EqualityMatchExpression>( + nsField, Value(dbElem.str() + "." + collElem.str())); + } + case BSONType::String: { + // Handles case with field path, like '{"ns.coll": "coll"}'. There must be 2 parts + // to the field path, ie. 'ns' and '[db | coll]'. + if (predicate->fieldRef()->numParts() != 2) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // Extract the second field and verify that it is either 'db' or 'coll'. + auto fieldName = predicate->fieldRef()->getPart(1); + if (fieldName != "db" && fieldName != "coll") { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // If the predicate is on 'coll' but we only have a db, we will never match. + if (fieldName == "coll" && nsFieldIsCmdNs && !collNameField) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // If the predicate is on 'db' and 'nsFieldIsCmdNs' is set to true, match the $cmd + // namespace. + if (nsFieldIsCmdNs && fieldName == "db") { + return std::make_unique<EqualityMatchExpression>(nsField, + Value(nsElem.str() + ".$cmd")); + } + // If the predicate is on 'coll', match the 'collNameField' if we have one. + if (collNameField && fieldName == "coll") { + return std::make_unique<EqualityMatchExpression>(*collNameField, + Value(nsElem.str())); + } + + // Otherwise, we are rewriting this predicate to operate on a field containing the + // full namespace. If the predicate is on 'db', match all collections in that DB. If + // the predicate is on 'coll', match that collection in all DBs. + auto nsRegex = [&]() { + if (fieldName == "db") { + return "^" + + DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) + + "\\." + DocumentSourceChangeStream::kRegexAllCollections; + } + return DocumentSourceChangeStream::kRegexAllDBs + "\\." + + DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) + + "$"; + }(); + + return std::make_unique<RegexMatchExpression>(nsField, nsRegex, ""); + } + case BSONType::RegEx: { + // Handles case with field path having regex, like '{"ns.db": /^db$/}'. There must + // be 2 parts to the field path, ie. 'ns' and '[db | coll]'. + if (predicate->fieldRef()->numParts() != 2) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // Extract the second field and verify that it either 'db' or 'coll'. + auto fieldName = predicate->fieldRef()->getPart(1); + if (fieldName != "db" && fieldName != "coll") { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // If the predicate is on 'coll' but we only have a db, we will never match. + if (fieldName == "coll" && nsFieldIsCmdNs && !collNameField) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + // Rather than attempting to rewrite the regex to apply to the oplog field, we will + // instead write an $expr to extract the dbName or collName from the oplog field, + // and apply the unmodified regex directly to it. First get a reference to the + // relevant field in the oplog entry. + std::string exprFieldRef = "'$" + + (fieldName == "db" ? nsField : (!nsFieldIsCmdNs ? nsField : *collNameField)) + + "'"; + + // Wrap the field in an expression to return MISSING if the field is not a string, + // since this expression may execute on CRUD oplog entries with clashing fieldnames. + // We will make this available to other expressions as the variable '$$oplogField'. + std::string exprOplogField = str::stream() + << "{$cond: {if: {$eq: [{$type: " << exprFieldRef + << "}, 'string']}, then: " << exprFieldRef << ", else: '$$REMOVE'}}"; + + // Now create an expression to extract the db or coll name from the oplog entry. + std::string exprDbOrCollName = [&]() -> std::string { + // If the query is on 'coll' and we have a collName field, use it as-is. + if (fieldName == "coll" && collNameField) { + return "'$$oplogField'"; + } + + // Otherwise, we need to split apart a full ns string. Find the separator. + // Return 0 if input is null in order to prevent throwing in $substrBytes. + std::string exprDotPos = + "{$ifNull: [{$indexOfBytes: ['$$oplogField', '.']}, 0]}"; + + // If the query is on 'db', return everything up to the separator. + if (fieldName == "db") { + return "{$substrBytes: ['$$oplogField', 0, " + exprDotPos + "]}"; + } + + // Otherwise, the query is on 'coll'. Return everything from (separator + 1) + // to the end of the string. + return str::stream() << "{$substrBytes: ['$$oplogField', {$add: [1, " + << exprDotPos << "]}, -1]}"; + }(); + + // Convert the MatchExpression $regex into a $regexMatch on the corresponding field. + std::string exprRegexMatch = str::stream() + << "{$regexMatch: {input: " << exprDbOrCollName << ", regex: '" + << nsElem.regex() << "', options: '" << nsElem.regexFlags() << "'}}"; + + // Finally, wrap the regex in a $let which defines the '$$oplogField' variable. + std::string exprRewrittenPredicate = str::stream() + << "{$let: {vars: {oplogField: " << exprOplogField + << "}, in: " << exprRegexMatch << "}}"; + + // Return a new ExprMatchExpression with the rewritten $regexMatch. + return std::make_unique<ExprMatchExpression>( + BSON("" << fromjson(exprRewrittenPredicate)).firstElement(), expCtx); + } + default: + break; + } + return nullptr; + }; + + // It is only feasible to attempt to rewrite a limited set of predicates here. + switch (predicate->matchType()) { + case MatchExpression::EQ: + case MatchExpression::INTERNAL_EXPR_EQ: { + auto eqME = static_cast<const ComparisonMatchExpressionBase*>(predicate); + return getRewrittenNamespace(eqME->getData()); + } + case MatchExpression::REGEX: { + // Create the BSON element from the regex match expression and return a rewritten match + // expression, if possible. + auto regME = static_cast<const RegexMatchExpression*>(predicate); + BSONObjBuilder regexBob; + regME->serializeToBSONTypeRegex(®exBob); + return getRewrittenNamespace(regexBob.obj().firstElement()); + } + case MatchExpression::MATCH_IN: { + auto inME = static_cast<const InMatchExpression*>(predicate); + + // An empty '$in' should not match anything. + if (inME->getEqualities().empty() && inME->getRegexes().empty()) { + return std::make_unique<AlwaysFalseMatchExpression>(); + } + + auto rewrittenOr = std::make_unique<OrMatchExpression>(); + + // For each equality expression, add the rewritten sub-expression to the '$or' + // expression. Abandon the entire rewrite, if any of the rewrite fails. + for (const auto& elem : inME->getEqualities()) { + if (auto rewrittenExpr = getRewrittenNamespace(elem)) { + rewrittenOr->add(std::move(rewrittenExpr)); + continue; + } + return nullptr; + } + + // For each regex expression, add the rewritten sub-expression to the '$or' expression. + // Abandon the entire rewrite, if any of the rewrite fails. + for (const auto& regME : inME->getRegexes()) { + BSONObjBuilder regexBob; + regME->serializeToBSONTypeRegex(®exBob); + if (auto rewrittenExpr = getRewrittenNamespace(regexBob.obj().firstElement())) { + rewrittenOr->add(std::move(rewrittenExpr)); + continue; + } + return nullptr; + } + return rewrittenOr; + } + default: + break; + } + + // If we have reached here, this is a predicate which we cannot rewrite. + return nullptr; +} + +/** + * Rewrites filters on 'ns' in a format that can be applied directly to the oplog. + * Returns nullptr if the predicate cannot be rewritten. + */ +std::unique_ptr<MatchExpression> matchRewriteNs( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const PathMatchExpression* predicate, + bool allowInexact) { + // We should only ever see predicates on the 'ns' field. + tassert(5554101, "Unexpected empty path", !predicate->path().empty()); + tassert(5554102, + str::stream() << "Unexpected predicate on " << predicate->path(), + predicate->fieldRef()->getPart(0) == DocumentSourceChangeStream::kNamespaceField); + + // + // CRUD events + // + + // CRUD ops are rewritten to the 'ns' field that contains a full namespace string. + auto crudNsRewrite = matchRewriteGenericNamespace(expCtx, predicate, "ns"_sd); + + // If we can't rewrite this predicate for CRUD operations, then we don't expect to be able to + // rewrite it for any other operations either. + if (!crudNsRewrite) { + return nullptr; + } + + // Create the final namespace filter for CRUD operations, i.e. {op: {$ne: 'c'}}. + auto crudNsFilter = std::make_unique<AndMatchExpression>(); + crudNsFilter->add( + MatchExpressionParser::parseAndNormalize(fromjson("{op: {$ne: 'c'}}"), expCtx)); + crudNsFilter->add(std::move(crudNsRewrite)); + + // + // Command events + // + + // Group together all command event cases. + auto cmdCases = std::make_unique<OrMatchExpression>(); + + // The 'rename' event is rewritten to a field that contains the full namespace string. + auto renameNsRewrite = matchRewriteGenericNamespace(expCtx, predicate, "o.renameCollection"_sd); + tassert(5554103, "Unexpected rewrite failure", renameNsRewrite); + cmdCases->add(std::move(renameNsRewrite)); + + // The 'drop' event is rewritten to the cmdNs in 'ns' and the collection name in 'o.drop'. + auto dropNsRewrite = matchRewriteGenericNamespace( + expCtx, predicate, "ns"_sd, true /* nsFieldIsCmdNs */, "o.drop"_sd); + tassert(5554104, "Unexpected rewrite failure", dropNsRewrite); + cmdCases->add(std::move(dropNsRewrite)); + + // The 'dropDatabase' event is rewritten to the cmdNs in 'ns'. It does not have a collection + // field. + auto dropDbNsRewrite = + matchRewriteGenericNamespace(expCtx, predicate, "ns"_sd, true /* nsFieldIsCmdNs */); + tassert(5554105, "Unexpected rewrite failure", dropDbNsRewrite); + auto andDropDbNsRewrite = std::make_unique<AndMatchExpression>(std::move(dropDbNsRewrite)); + andDropDbNsRewrite->add(std::make_unique<EqualityMatchExpression>("o.dropDatabase", Value(1))); + cmdCases->add(std::move(andDropDbNsRewrite)); + + // Create the final namespace filter for {op: 'c'} operations. + auto cmdNsFilter = std::make_unique<AndMatchExpression>(); + cmdNsFilter->add(MatchExpressionParser::parseAndNormalize(fromjson("{op: 'c'}"), expCtx)); + cmdNsFilter->add(std::move(cmdCases)); + + // + // Build final 'ns' filter + // + + // Construct the final rewritten predicate from each of the rewrite categories. + auto rewrittenPredicate = std::make_unique<OrMatchExpression>(); + rewrittenPredicate->add(std::move(crudNsFilter)); + rewrittenPredicate->add(std::move(cmdNsFilter)); + + return rewrittenPredicate; +} + // Map of fields names for which a simple rename is sufficient when rewriting. StringMap<std::string> renameRegistry = { {"clusterTime", "ts"}, {"lsid", "lsid"}, {"txnNumber", "txnNumber"}}; @@ -385,7 +722,8 @@ StringMap<std::string> renameRegistry = { StringMap<MatchExpressionRewrite> matchRewriteRegistry = { {"operationType", matchRewriteOperationType}, {"documentKey", matchRewriteDocumentKey}, - {"fullDocument", matchRewriteFullDocument}}; + {"fullDocument", matchRewriteFullDocument}, + {"ns", matchRewriteNs}}; // Map of field names to corresponding agg Expression rewrite functions. StringMap<AggExpressionRewrite> exprRewriteRegistry = {{"operationType", exprRewriteOperationType}}; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 85eb5b8195d..496091eee1c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -61,19 +61,6 @@ #include "mongo/db/vector_clock.h" namespace mongo { -namespace { -std::string regexEscape(StringData source) { - std::string result = ""; - std::string escapes = "*+|()^?[]./\\$"; - for (const char& c : source) { - if (escapes.find(c) != std::string::npos) { - result.append("\\"); - } - result += c; - } - return result; -} -} // namespace using boost::intrusive_ptr; using boost::optional; @@ -144,11 +131,12 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac switch (type) { case ChangeStreamType::kSingleCollection: // Match the target namespace exactly. - return "^" + regexEscape(nss.ns()) + "$"; + return "^" + regexEscapeNsForChangeStream(nss.ns()) + "$"; case ChangeStreamType::kSingleDatabase: // Match all namespaces that start with db name, followed by ".", then NOT followed by // '$' or 'system.' - return "^" + regexEscape(nss.db().toString()) + "\\." + kRegexAllCollections; + return "^" + regexEscapeNsForChangeStream(nss.db().toString()) + "\\." + + kRegexAllCollections; case ChangeStreamType::kAllChangesForCluster: // Match all namespaces that start with any db name other than admin, config, or local, // followed by ".", then NOT followed by '$' or 'system.'. @@ -163,7 +151,7 @@ std::string DocumentSourceChangeStream::getCollRegexForChangeStream(const Namesp switch (type) { case ChangeStreamType::kSingleCollection: // Match the target collection exactly. - return "^" + regexEscape(nss.coll()) + "$"; + return "^" + regexEscapeNsForChangeStream(nss.coll()) + "$"; case ChangeStreamType::kSingleDatabase: case ChangeStreamType::kAllChangesForCluster: // Match any collection; database filtering will be done elsewhere. @@ -179,7 +167,7 @@ std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(const Names case ChangeStreamType::kSingleCollection: case ChangeStreamType::kSingleDatabase: // Match the target database command namespace exactly. - return "^" + regexEscape(nss.getCommandNS().ns()) + "$"; + return "^" + regexEscapeNsForChangeStream(nss.getCommandNS().ns()) + "$"; case ChangeStreamType::kAllChangesForCluster: // Match all command namespaces on any database. return kRegexAllDBs + "\\." + kRegexCmdColl; @@ -188,6 +176,18 @@ std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(const Names } } +std::string DocumentSourceChangeStream::regexEscapeNsForChangeStream(StringData source) { + std::string result = ""; + std::string escapes = "*+|()^?[]./\\$"; + for (const char& c : source) { + if (escapes.find(c) != std::string::npos) { + result.append("\\"); + } + result += c; + } + return result; +} + ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec( const DocumentSourceChangeStreamSpec& spec) { if (spec.getStartAfter()) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 5cf43b37988..3bd12f97a89 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -184,6 +184,8 @@ public: static std::string getCollRegexForChangeStream(const NamespaceString& nss); static std::string getCmdNsRegexForChangeStream(const NamespaceString& nss); + static std::string regexEscapeNsForChangeStream(StringData source); + /** * Parses a $changeStream stage from 'elem' and produces the $match and transformation * stages required. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index a0479604175..27e9b03097f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -3954,7 +3954,37 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAllStagesAndResumeT "$_internalChangeStreamEnsureResumeTokenPresent"}); } -using ChangeStreamRewriteTest = AggregationContextFixture; +class ChangeStreamRewriteTest : public AggregationContextFixture { +public: + std::string getNsDbRegexMatchExpr(const std::string& field, const std::string& regex) { + return str::stream() + << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field + << "']}, {$const: 'string'}]}, '" << field + << "', '$$REMOVE']}}, in: {$regexMatch: {input: {$substrBytes: ['$$oplogField', " + "{$const: 0}, {$ifNull: [{$indexOfBytes: ['$$oplogField', {$const: " + "'.'}]}, {$const: 0}]}]}, regex: {$const: '" + << regex << "'}, options: {$const: ''}}}}}}"; + } + + std::string getNsCollRegexMatchExpr(const std::string& field, const std::string& regex) { + if (field == "$o.drop") { + return str::stream() + << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field + << "']}, {$const: 'string'}]}, '" << field + << "', '$$REMOVE']}}, in: {$regexMatch: {input: '$$oplogField', regex: {$const: '" + << regex << "'}, options: {$const: ''}}}}}}"; + } + + return str::stream() + << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field + << "']}, {$const: 'string'}]}, '" << field + << "', '$$REMOVE']}}, in: {$regexMatch: {input: {$substrBytes: ['$$oplogField', {$add: " + "[{$const: 1}, " + "{$ifNull: [{$indexOfBytes: ['$$oplogField', {$const: '.'}]}, {$const: 0}]}]}, " + "{$const: -1}]}, regex: {$const: '" + << regex << "'}, options: {$const: ''}} }}}}"; + } +}; TEST_F(ChangeStreamRewriteTest, RewriteOrPredicateOnRenameableFields) { auto spec = fromjson("{$or: [{clusterTime: {$type: [17]}}, {lsid: {$type: [16]}}]}"); @@ -4707,5 +4737,815 @@ TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldFullDocument // rewritten predicate that matches exactly. ASSERT(rewrittenMatchExpression == nullptr); } + +TEST_F(ChangeStreamRewriteTest, CanRewriteFullNamespaceObject) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse( + BSON("ns" << BSON("db" << expCtx->ns.db() << "coll" << expCtx->ns.coll())), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + const std::string ns = expCtx->ns.db().toString() + "." + expCtx->ns.coll().toString(); + const std::string cmdNs = expCtx->ns.db().toString() + ".$cmd"; + + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), BSON("ns" << BSON("$eq" << ns)))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON("o.renameCollection" << BSON("$eq" << ns)), + BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)), + BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())))), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithSwappedField) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse( + BSON("ns" << BSON("coll" << expCtx->ns.coll() << "db" << expCtx->ns.db())), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyDbField) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("ns" << BSON("db" << expCtx->ns.db())), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string cmdNs = expCtx->ns.db().toString() + ".$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1}"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyCollectionField) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("ns" << BSON("coll" << expCtx->ns.coll())), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidDbField) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse( + BSON("ns" << BSON("db" << 1 << "coll" << expCtx->ns.coll())), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidCollField) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse( + BSON("ns" << BSON("db" << expCtx->ns.db() << "coll" << 1)), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithExtraField) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse( + fromjson("{ns: {db: 'db', coll: 'coll', extra: 'extra'}}"), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithStringDbFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("ns.db" << expCtx->ns.db()), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + const std::string regexNs = "^" + expCtx->ns.db().toString() + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string cmdNs = expCtx->ns.db().toString() + ".$cmd"; + + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON("ns" << BSON("$regex" << regexNs)))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)), + BSON("ns" << BSON("$eq" << cmdNs)), + BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithCollectionFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("ns.coll" << expCtx->ns.coll()), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + const std::string regexNs = DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + + expCtx->ns.coll().toString() + "$"; + + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON("ns" << BSON("$regex" << regexNs)))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)), + BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())), + BSON(AND(fromjson("{$alwaysFalse: 1}"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexDbFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("ns.db" << BSONRegEx(R"(^unit.*$)")), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^unit.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), + BSON(AND(fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexCollectionFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = + MatchExpressionParser::parse(BSON("ns.coll" << BSONRegEx(R"(^pipeline.*$)")), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + fromjson(getNsCollRegexMatchExpr("$ns", R"(^pipeline.*$)")))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection", + R"(^pipeline.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^pipeline.*$)")), + BSON(AND(fromjson("{ $alwaysFalse: 1 }"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInvalidDbFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.db" << 1), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInvalidCollectionFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.coll" << 1), expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraDbFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.db.subField" + << "subDb"), + expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1 }"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraCollectionFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.coll.subField" + << "subColl"), + expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1 }"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInvalidFieldPath) { + auto expCtx = getExpCtx(); + auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.unknown" + << "test"), + expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1 }"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDb) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$in: ['test', 'news']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string firstRegexNs = std::string("^") + "news" + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string secondRegexNs = std::string("^") + "test" + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string firstCmdNs = "news.$cmd"; + const std::string secondCmdNs = "test.$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << firstRegexNs)), + BSON("ns" << BSON("$regex" << secondRegexNs)))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)), + BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)))), + BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), + BSON("ns" << BSON("$eq" << secondCmdNs)))), + BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), + BSON("ns" << BSON("$eq" << secondCmdNs)))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDb) { + auto expCtx = getExpCtx(); + auto expr = BSON("ns.db" << BSON("$nin" << BSON_ARRAY("test" + << "news"))); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string firstRegexNs = std::string("^") + "test" + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string secondRegexNs = std::string("^") + "news" + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string firstCmdNs = "test.$cmd"; + const std::string secondCmdNs = "news.$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR(BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + BSON("ns" << BSON("$regex" << firstRegexNs)))))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)))), + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + BSON("ns" << BSON("$eq" << firstCmdNs)))), + BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + BSON("ns" << BSON("$eq" << firstCmdNs)))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollection) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$in: ['test', 'news']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string firstRegexNs = + DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "news" + "$"; + const std::string secondRegexNs = + DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "test" + "$"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << firstRegexNs)), + BSON("ns" << BSON("$regex" << secondRegexNs)))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)), + BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)))), + BSON(OR(BSON("o.drop" << BSON("$eq" + << "news")), + BSON("o.drop" << BSON("$eq" + << "test")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), + fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnCollection) { + auto expCtx = getExpCtx(); + auto expr = BSON("ns.coll" << BSON("$nin" << BSON_ARRAY("test" + << "news"))); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string firstRegexNs = + DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "test" + "$"; + const std::string secondRegexNs = + DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "news" + "$"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR(BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + BSON("ns" << BSON("$regex" << firstRegexNs)))))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)))), + BSON(OR(BSON("o.drop" << BSON("$eq" + << "news")), + BSON("o.drop" << BSON("$eq" + << "test")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), + fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnDb) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$in: [/^test.*$/, /^news$/]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR( + BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), + BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnDb) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$nin: [/^test.*$/, /^news$/]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR(BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR( + BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), + BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnCollection) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$in: [/^test.*$/, /^news$/]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$ns", R"(^news$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR( + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnCollection) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$nin: [/^test.*$/, /^news$/]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR(BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$ns", R"(^news$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR( + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDbWithRegexAndString) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$in: [/^test.*$/, 'news']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string secondRegexNs = std::string("^") + "news" + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string secondCmdNs = "news.$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR( + BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))), + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), + BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDbWithRegexAndString) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$nin: [/^test.*$/, 'news']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + + const std::string secondRegexNs = std::string("^") + "news" + "\\." + + DocumentSourceChangeStream::kRegexAllCollections.toString(); + const std::string secondCmdNs = "news.$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR(BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR( + BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))), + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), + BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollectionWithRegexAndString) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$in: [/^test.*$/, 'news']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string secondRegexNs = + DocumentSourceChangeStream::kRegexAllDBs + "\\." + "news" + "$"; + const std::string secondCmdNs = "news.$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$o.renameCollection", + R"(^test.*$)")))), + BSON(OR(BSON("o.drop" << BSON("$eq" + << "news")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), + fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, + CanRewriteNamespaceWithNinExpressionOnCollectionWithRegexAndString) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$nin: [/^test.*$/, 'news']}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + const std::string secondRegexNs = + DocumentSourceChangeStream::kRegexAllDBs + "\\." + "news" + "$"; + const std::string secondCmdNs = "news.$cmd"; + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR(BSON(OR( + BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$o.renameCollection", + R"(^test.*$)")))), + BSON(OR(BSON("o.drop" << BSON("$eq" + << "news")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), + fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInExpressionOnInvalidDb) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$in: ['test', 1]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithNinExpressionOnInvalidDb) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$nin: ['test', 1]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInExpressionOnInvalidCollection) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$in: ['coll', 1]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithNinExpressionOnInvalidCollection) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.coll': {$nin: ['coll', 1]}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT_FALSE(rewrittenMatchExpression); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyInExpression) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$in: []}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1 }"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); +} + +TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyNinExpression) { + auto expCtx = getExpCtx(); + auto expr = fromjson("{'ns.db': {$nin: []}}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ( + rewrittenPredicate, + BSON(NOR( + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), + BSON(AND(fromjson("{op: {$eq: 'c'}}"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), + fromjson("{$alwaysFalse: 1 }"), + BSON(AND(fromjson("{$alwaysFalse: 1 }"), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); +} + } // namespace } // namespace mongo |