summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/change_stream_match_pushdown_and_rewrite.js661
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp340
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp842
5 files changed, 1824 insertions, 55 deletions
diff --git a/jstests/sharding/change_stream_match_pushdown_and_rewrite.js b/jstests/sharding/change_stream_match_pushdown_and_rewrite.js
index 793653bb737..3714c9077f8 100644
--- a/jstests/sharding/change_stream_match_pushdown_and_rewrite.js
+++ b/jstests/sharding/change_stream_match_pushdown_and_rewrite.js
@@ -13,7 +13,7 @@
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
const dbName = "change_stream_match_pushdown_and_rewrite";
-const collName = "change_stream_match_pushdown_and_rewrite";
+const collName = "coll1";
const collNameAlternate = "change_stream_match_pushdown_and_rewrite_alternate";
const st = new ShardingTest({
@@ -49,7 +49,7 @@ function getOplogExecutionStatsForShard(stats, shardName) {
}
// Returns a newly created sharded collection, where shard key is '_id'.
-const coll = (() => {
+function createShardedCollection(collName, splitAt) {
assertDropAndRecreateCollection(db, collName);
const coll = db.getCollection(collName);
@@ -57,16 +57,19 @@ const coll = (() => {
st.ensurePrimaryShard(dbName, st.shard0.shardName);
- // Shard the test collection and split it into two chunks: one that contains all {_id: 1}
- // documents and one that contains all {_id: 2} documents.
+ // Shard the test collection and split it into two chunks: one that contains all {_id: <lt
+ // splitAt>} documents and one that contains all {_id: <gte splitAt>} documents.
st.shardColl(collName,
{_id: 1} /* shard key */,
- {_id: 2} /* split at */,
- {_id: 2} /* move the chunk containing {_id: 2} to its own shard */,
+ {_id: splitAt} /* split at */,
+ {_id: splitAt} /* move the chunk containing {_id: splitAt} to its own shard */,
dbName,
true);
return coll;
-})();
+}
+
+// Create a sharded collection.
+const coll = createShardedCollection(collName, 2 /* splitAt */);
// Create a second (unsharded) test collection for validating transactions that insert into multiple
// collections.
@@ -251,7 +254,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
userMatchExpr,
aggregateComment,
expectedOps,
- expectedOplogCursorReturnedDocs) => {
+ expectedOplogRetDocsForEachShard) => {
const cursor =
coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr],
{comment: aggregateComment});
@@ -279,10 +282,10 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{comment: aggregateComment});
const execStatsShard0 = getOplogExecutionStatsForShard(stats, st.rs0.name);
- assert.eq(execStatsShard0.nReturned, expectedOplogCursorReturnedDocs, execStatsShard0);
+ assert.eq(execStatsShard0.nReturned, expectedOplogRetDocsForEachShard, execStatsShard0);
const execStatsShard1 = getOplogExecutionStatsForShard(stats, st.rs1.name);
- assert.eq(execStatsShard1.nReturned, expectedOplogCursorReturnedDocs, execStatsShard1);
+ assert.eq(execStatsShard1.nReturned, expectedOplogRetDocsForEachShard, execStatsShard1);
};
// Open a change stream and store the resume token. This resume token will be used to replay the
@@ -305,35 +308,35 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: "insert"}},
"change_stream_rewritten_insert_op_type",
["insert"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the 'update' operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: "update"}},
"change_stream_rewritten_update_op_type",
["update"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the 'replace' operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: "replace"}},
"change_stream_rewritten_replace_op_type",
["replace"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the 'delete' operation type is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: "delete"}},
"change_stream_rewritten_delete_op_type",
["delete"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the operation type as number is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: 1}},
"change_stream_rewritten_invalid_number_op_type",
[] /* expectedOps */,
- 0 /* expectedOplogCursorReturnedDocs */);
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on an unknown operation type cannot be rewritten to the oplog
// format. The oplog cursor should return '4' documents.
@@ -341,7 +344,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: "unknown"}},
"change_stream_rewritten_unknown_op_type",
[] /* expectedOps */,
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on an empty string operation type cannot be rewritten to the oplog
// format. The oplog cursor should return '4' documents for each shard.
@@ -349,7 +352,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: ""}},
"change_stream_rewritten_empty_op_type",
[] /* expectedOps */,
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on operation type with inequality operator cannot be rewritten to
// the oplog format. The oplog cursor should return '4' documents for each shard.
@@ -357,7 +360,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: {$gt: "insert"}}},
"change_stream_rewritten_inequality_op_type",
["update", "replace"],
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on operation type sub-field can be rewritten to
// the oplog format. The oplog cursor should return '0' documents for each shard.
@@ -365,14 +368,14 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {"operationType.subField": "subOperation"}},
"change_stream_rewritten_sub_op_type",
[] /* expectedOps */,
- 0 /* expectedOplogCursorReturnedDocs */);
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the operation type with '$in' is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$in: ["insert", "update"]}}},
"change_stream_rewritten_op_type_with_in_expr",
["insert", "update"],
- 2 /* expectedOplogCursorReturnedDocs */);
+ 2 /* expectedOplogRetDocsForEachShard */);
// Ensure that for the '$in' with one valid and one invalid operation type, rewrite to the
// oplog format will be abandoned. The oplog cursor should return '4' documents for each shard.
@@ -381,7 +384,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: {$in: ["insert", "unknown"]}}},
"change_stream_rewritten_op_type_with_in_expr_with_one_invalid_op_type",
["insert"],
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' with '$in' on an unknown operation type cannot be rewritten. The
// oplog cursor should return '4' documents for each shard.
@@ -389,14 +392,14 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: {$in: ["unknown"]}}},
"change_stream_rewritten_op_type_with_in_expr_with_unknown_op_type",
[] /* expectedOps */,
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' with '$in' with operation type as number is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$in: [1]}}},
"change_stream_rewritten_op_type_with_in_expr_with_number_op_type",
[] /* expectedOps */,
- 0 /* expectedOplogCursorReturnedDocs */);
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' with '$in' with operation type as a string and a regex cannot be
// rewritten. The oplog cursor should return '4' documents for each shard.
@@ -405,14 +408,14 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: {$in: [/^insert$/, "update"]}}},
"change_stream_rewritten_op_type_with_in_expr_with_string_and_regex_op_type",
["insert", "update"] /* expectedOps */,
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the operation type with '$nin' is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$nin: ["insert"]}}},
"change_stream_rewritten_op_type_with_nin_expr",
["update", "replace", "delete"],
- 3 /* expectedOplogCursorReturnedDocs */);
+ 3 /* expectedOplogRetDocsForEachShard */);
// Ensure that for the '$nin' with one valid and one invalid operation type, rewrite to the
// oplog format will be abandoned. The oplog cursor should return '4' documents for each shard.
@@ -421,14 +424,14 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {operationType: {$nin: ["insert", "unknown"]}}},
"change_stream_rewritten_op_type_with_nin_expr_with_one_invalid_op_type",
["update", "replace", "delete"],
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' with '$nin' with operation type as number is rewritten correctly.
verifyNonInvalidatingOps(resumeAfterToken,
{$match: {operationType: {$nin: [1]}}},
"change_stream_rewritten_op_type_with_nin_expr_with_number_op_type",
["insert", "update", "replace", "delete"],
- 4 /* expectedOplogCursorReturnedDocs */);
+ 4 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' to match only 'insert' operations is rewritten
// correctly.
@@ -436,7 +439,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {$expr: {$eq: ["$operationType", "insert"]}}},
"change_stream_rewritten_op_type_eq_insert_in_expr",
["insert"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' to match only 'update' operations is rewritten
// correctly.
@@ -444,7 +447,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {$expr: {$eq: ["$operationType", "update"]}}},
"change_stream_rewritten_op_type_eq_update_in_expr",
["update"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' to match only 'replace' operations is rewritten
// correctly.
@@ -452,7 +455,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {$expr: {$eq: ["$operationType", "replace"]}}},
"change_stream_rewritten_op_type_eq_replace_in_expr",
["replace"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' to match only 'delete' operations is rewritten
// correctly.
@@ -460,7 +463,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {$expr: {$eq: ["$operationType", "delete"]}}},
"change_stream_rewritten_op_type_eq_delete_in_expr",
["delete"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' is rewritten correctly when comparing with 'unknown'
// operation type.
@@ -468,7 +471,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {$expr: {$eq: ["$operationType", "unknown"]}}},
"change_stream_rewritten_op_type_eq_unknown_in_expr",
[] /* expectedOps */,
- 0 /* expectedOplogCursorReturnedDocs */);
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' is rewritten correctly when '$and' is in the
// expression.
@@ -485,7 +488,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
},
"change_stream_rewritten_op_type_in_expr_with_and",
["delete"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' is rewritten correctly when '$or' is in the
// expression.
@@ -502,7 +505,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
},
"change_stream_rewritten_op_type_in_expr_with_or",
["update", "replace", "delete"],
- 3 /* expectedOplogCursorReturnedDocs */);
+ 3 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' is rewritten correctly when '$not' is in the
// expression.
@@ -511,7 +514,7 @@ assert.commandWorked(db.createCollection(collNameAlternate));
{$match: {$expr: {$not: {$regexMatch: {input: "$operationType", regex: /e$/}}}}},
"change_stream_rewritten_op_type_in_expr_with_not",
["insert"],
- 1 /* expectedOplogCursorReturnedDocs */);
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' using '$expr' is rewritten correctly when nor ({$not: {$or: [...]}})
// is in the expression.
@@ -530,7 +533,593 @@ assert.commandWorked(db.createCollection(collNameAlternate));
},
"change_stream_rewritten_op_type_in_expr_with_nor",
["update", "replace"],
- 2 /* expectedOplogCursorReturnedDocs */);
+ 2 /* expectedOplogRetDocsForEachShard */);
+})();
+
+(function testNamespaceRewrites() {
+ // A helper that opens a change stream on the whole cluster with the user supplied match
+ // expression 'userMatchExpr' and validates that:
+ // 1. for each shard, the events are seen in that order as specified in 'expectedResult'
+ // 2. the filtering is been done at oplog level
+ const verifyOnWholeCluster = (resumeAfterToken,
+ userMatchExpr,
+ expectedResult,
+ expectedOplogRetDocsForEachShard) => {
+ const cursor = db.getSiblingDB("admin").aggregate([
+ {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}},
+ userMatchExpr
+ ]);
+
+ for (const [collOrDb, opDict] of Object.entries(expectedResult)) {
+ for (const [op, eventIdentifierList] of Object.entries(opDict)) {
+ eventIdentifierList.forEach(eventIdentifier => {
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+ assert.eq(event.operationType, op, event);
+
+ if (op == "dropDatabase") {
+ assert.eq(event.ns.db, eventIdentifier, event);
+ } else if (op == "insert") {
+ assert.eq(event.documentKey._id, eventIdentifier, event);
+ } else if (op == "rename") {
+ assert.eq(event.to.coll, eventIdentifier, event);
+ } else if (op == "drop") {
+ assert.eq(event.ns.coll, eventIdentifier);
+ } else {
+ assert(false, event);
+ }
+
+ if (op != "dropDatabase") {
+ assert.eq(event.ns.coll, collOrDb);
+ }
+ });
+ }
+ }
+
+ assert(!cursor.hasNext());
+
+ const stats = db.getSiblingDB("admin").runCommand({
+ explain: {
+ aggregate: 1,
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}},
+ userMatchExpr
+ ],
+ cursor: {batchSize: 0}
+ },
+ verbosity: "executionStats"
+ });
+
+ const execStatsShard0 = getOplogExecutionStatsForShard(stats, st.rs0.name);
+ assert.eq(execStatsShard0.nReturned, expectedOplogRetDocsForEachShard, execStatsShard0);
+
+ const execStatsShard1 = getOplogExecutionStatsForShard(stats, st.rs1.name);
+ assert.eq(execStatsShard1.nReturned, expectedOplogRetDocsForEachShard, execStatsShard1);
+ };
+
+ // Create some new collections to ensure that test cases has sufficient namespaces to verify
+ // that the namespace filtering is working correctly.
+ const coll2 = createShardedCollection("coll2", 4 /* splitAt */);
+ const coll3 = createShardedCollection("coll.coll3", 6 /* splitAt */);
+ const coll4 = createShardedCollection("coll4", 10 /* splitAt */);
+
+ // Open a change stream and store the resume token. This resume token will be used to replay the
+ // stream after this point.
+ const resumeAfterToken =
+ db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken();
+
+ // For each collection, insert 2 documents, one on each shard. These will create oplog events
+ // and change stream will apply various namespace filtering on these collections to verify that
+ // the namespace is rewritten correctly. Each documents also contains field names matching with
+ // that of '$cmd' operations, ie. 'renameCollection', 'drop' and 'dropDatabase', but with
+ // value-type other than strings. The 'ns' match filters should gracefully handle the type
+ // mismatch and not throw any error.
+ // Each of these inserted documents will be represented in this form in the oplog:
+ // {... "o": {"_id": <id>, "renameCollection": true, "drop": {}, "dropDatabase": null}, ...}
+ // A few collections are renamed and dropped to verify that these are filtered properly.
+ assert.commandWorked(
+ coll.insert({_id: 1, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(
+ coll.insert({_id: 2, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(
+ coll2.insert({_id: 3, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(
+ coll2.insert({_id: 4, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(coll2.renameCollection("newColl2"));
+ assert.commandWorked(
+ coll3.insert({_id: 5, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(
+ coll3.insert({_id: 6, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert(coll3.drop());
+
+ // Insert some documents into 'coll4' with field names which match known command types. Despite
+ // the fact that these documents could potentially match with the partial 'ns' filter we rewrite
+ // into the oplog, the {op: "c"} predicate we add into the filter should ensure that they are
+ // correctly discarded.
+ assert.commandWorked(coll4.insert(
+ {_id: 7, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1}));
+ assert.commandWorked(
+ coll4.insert({_id: 8, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(
+ coll4.insert({_id: 9, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""}));
+ assert.commandWorked(coll4.insert(
+ {_id: 10, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1}));
+ assert.commandWorked(
+ coll4.insert({_id: 11, renameCollection: true, drop: {}, dropDatabase: null}));
+ assert.commandWorked(
+ coll4.insert({_id: 12, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""}));
+
+ // This group of tests ensures that the '$match' on a particular namespace object only sees its
+ // documents and only required document(s) are returned at the oplog for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll1"}}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll2"}}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll.coll3"}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on the namespace with only db component should not emit any
+ // document and the oplog should not return any documents.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the namespace object with 'unknown' collection does not exists and the oplog
+ // cursor returns 0 document.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "unknown"}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the namespace object with flipped fields does not match with the namespace object
+ // and the oplog cursor returns 0 document.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {coll: "coll1", db: dbName}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the namespace object with extra fields does not match with the namespace object
+ // and the oplog cursor returns 0 document.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll1", extra: "extra"}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the empty namespace object does not match with the namespace object and the oplog
+ // cursor returns 0 document.
+ verifyOnWholeCluster(resumeAfterToken, {$match: {ns: {}}}, {}, 0);
+
+ // Ensure the '$match' on namespace's db should return documents for all collection and oplog
+ // should return all documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": dbName}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // These cases ensure that the '$match' on regex of namespace' db, should return documents for
+ // all collection and oplog should return all documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^change_stream_match_pushdown.*$/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^(change_stream_match_pushdown.*$)/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /(^unknown$|^change_stream_match_pushdown.*$)/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^unknown$|^change_stream_match_pushdown.*$/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on non-existing db should not return any document and oplog should
+ // not return any document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": "unknown"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on empty db should not return any document and oplog should not
+ // return any document for each shard.
+ verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"ns.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on sub field of db should not return any document and oplog should
+ // not return any document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db.extra": dbName}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // This group of tests ensures that the '$match' on collection field path should emit only the
+ // required documents and oplog should return only required document(s) for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "coll1"}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "coll2"}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "coll.coll3"}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // This group of tests ensures that the '$match' on the regex of the collection field path
+ // should emit only the required documents and oplog should return only required document(s) for
+ // each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*1/}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*2/}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*3/}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // This group of tests ensures that the '$match' on the regex matching all collections should
+ // return documents from all collection and oplog should return all document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^CoLL.*/i}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on the regex matching 3 collection should return documents from
+ // these collections and oplog should return required documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*1$|^col.*2$|^col.*3$/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}
+ },
+ 5 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on the regex to exclude 'coll1', 'coll2' and 'coll4' should return
+ // only documents from 'coll.coll3' and oplog should return required documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^coll[^124]/}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on non-existing collection should not return any document and oplog
+ // should not return any document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "unknown"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on empty collection should not return any document and oplog should
+ // not return any document for each shard.
+ verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"ns.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that the '$match' on sub field of collection should not return any document and oplog
+ // should not return any document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll.extra": "coll1"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' on db should return all documents and oplog should return all documents for
+ // each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [dbName]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // This group of tests ensures that '$in' on regex matching the db name should return all
+ // documents and oplog should return all documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [/^change_stream_match.*$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [/^change_stream_MATCH.*$/i]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that an empty '$in' on db path should not match any collection and oplog should not
+ // return any document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: []}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents
+ // for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [dbName, 1]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should
+ // return '0' document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: ["unknown1", /^unknown2$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' on multiple collections should return the required documents and oplog
+ // should return required documents for each shard.
+ verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"ns": {$in: [{db: dbName, coll: "coll1"}, {db: dbName, coll: "coll2"}]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["coll1", "coll2"]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' on regex of multiple collections should return the required documents and
+ // oplog should return required documents for each shard.
+ verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"ns.coll": {$in: [/^coll1$/, /^coll2$/]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+ // This group of tests ensures that '$in' on regex of matching all collections should return all
+ // documents and oplog should return all documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: [/^coll.*$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: [/^COLL.*$/i]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that an empty '$in' should not match any collection and oplog should not return any
+ // document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: []}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' with invalid collection cannot be rewritten and oplog should return all
+ // documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["coll1", 1]}}},
+ {coll1: {insert: [1, 2]}},
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' with mix of string and regex matching collections can be rewritten and
+ // oplog should return required documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["coll1", /^coll.*3$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$in' with mix of string and regex can be rewritten and oplog should
+ // return '0' document for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["unknown1", /^unknown2$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // This group of tests ensure that '$nin' on db path should return all documents and oplog
+ // should return all documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: []}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: ["unknown"]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: [/^unknown$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // These group of tests ensure that '$nin' on matching db name should not return any documents
+ // and oplog should return '0' documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: [dbName]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$nin' on multiple collections should return the required documents and oplog
+ // should return required documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: ["coll1", "coll2", "coll4"]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$nin' on regex of multiple collections should return the required documents and
+ // oplog should return required documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: [/^coll1$/, /^coll2$/, /^coll4$/]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$nin' on regex of matching all collections should not return any document and
+ // oplog should return '0' documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: [/^coll.*$/, /^sys.*$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that an empty '$nin' should match all collections and oplog should return all
+ // documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: []}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$nin' with invalid collection cannot be rewritten and oplog should
+ // return all documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: ["coll1", 1]}}},
+ {
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ coll4: {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+ // Ensure that '$nin' with mix of string and regex can be rewritten and oplog should
+ // return required documents for each shard.
+ verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: ["coll1", /^coll2$/, "coll4"]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+ // At this stage, the coll2 has been renamed to 'newColl2' and coll3 has been dropped. The test
+ // from here will drop the database and ensure that the 'ns' filter when applied over the
+ // collection should only emit the 'drop' event for that collection and not the 'dropDatabase'
+ // event. It should be noted that for 'newColl2' and 'coll3', the 'dropDatabase' will be no-op
+ // and will not emit any documents.
+
+ // Open a new change streams and verify that from here onwards the events related to
+ // 'dropDatabase' are seen.
+ const secondResumeAfterToken =
+ db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken();
+
+ assert.commandWorked(db.dropDatabase());
+
+ // This group of tests ensures that the match on 'coll1' only sees the 'drop' events.
+ verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll1"}}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {"ns.coll": "coll1"}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+ verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {"ns.coll": /^col.*1/}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+ verifyOnWholeCluster(
+ secondResumeAfterToken,
+ {$match: {ns: {db: dbName}}},
+ {change_stream_match_pushdown_and_rewrite_and_rewrite: {dropDatabase: [dbName, dbName]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
})();
st.stop();
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index af296aefed7..7bba41897d7 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -377,6 +377,343 @@ std::unique_ptr<MatchExpression> matchRewriteFullDocument(
return rewrittenPredicate;
}
+// Helper to rewrite predicates on any change stream namespace field of the form {db: "dbName",
+// coll: "collName"} into the oplog.
+
+// - By default, the rewrite is performed onto the given 'nsField' which specifies an oplog field
+// containing a complete namespace string, e.g. {ns: "dbName.collName"}.
+// - If 'nsFieldIsCmdNs' is true, then 'nsField' only contains the command-namespace of the
+// database, i.e. "dbName.$cmd".
+// - With 'nsFieldIsCmdNs set to true, the caller can also optionally provide 'collNameField' which
+// is the field containing the collection name. The 'collNameField' may be absent, which means
+// that the operation being rewritten has a 'db' field in the change stream event, but no 'coll'
+// field.
+std::unique_ptr<MatchExpression> matchRewriteGenericNamespace(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const PathMatchExpression* predicate,
+ StringData nsField,
+ bool nsFieldIsCmdNs = false,
+ boost::optional<StringData> collNameField = boost::none) {
+ // A collection name can only be specified with 'nsFieldIsCmdNs' set to true.
+ tassert(5554100,
+ "Cannot specify 'collNameField' with 'nsFieldIsCmdNs' set to false",
+ !(!nsFieldIsCmdNs && collNameField));
+
+ // Performs a rewrite based on the type of argument specified in the MatchExpression.
+ auto getRewrittenNamespace = [&](auto&& nsElem) -> std::unique_ptr<MatchExpression> {
+ switch (nsElem.type()) {
+ case BSONType::Object: {
+ // Handles case with full namespace object, like '{ns: {db: "db", coll: "coll"}}'.
+ // There must be a single part to the field path, ie. 'ns'.
+ if (predicate->fieldRef()->numParts() > 1) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // Extract the object from the RHS of the predicate.
+ auto nsObj = nsElem.embeddedObject();
+
+ // If a full namespace, or a collNameField were specified, there must be 2 fields in
+ // the object, i.e. db and coll.
+ if ((!nsFieldIsCmdNs || collNameField) && nsObj.nFields() != 2) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+ // Otherwise, there can only be 1 field in the object, i.e. db.
+ if (nsFieldIsCmdNs && !collNameField && nsObj.nFields() != 1) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // Extract the db and collection from the 'ns' object. The 'collElem' will point to
+ // the eoo, if it is not present.
+ BSONObjIterator iter{nsObj};
+ auto dbElem = iter.next();
+ auto collElem = iter.next();
+
+ // Verify that the first field is 'db' and is of type string. We should always have
+ // a db entry no matter what oplog fields we are operating on.
+ if (dbElem.fieldNameStringData() != "db" || dbElem.type() != BSONType::String) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+ // Verify that the second field is 'coll' and is of type string, if it exists.
+ if (collElem &&
+ (collElem.fieldNameStringData() != "coll" ||
+ collElem.type() != BSONType::String)) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ if (nsFieldIsCmdNs) {
+ auto rewrittenPred = std::make_unique<AndMatchExpression>();
+ rewrittenPred->add(std::make_unique<EqualityMatchExpression>(
+ nsField, Value(dbElem.str() + ".$cmd")));
+
+ if (collNameField) {
+ // If we are rewriting to a combination of cmdNs and collName, we match on
+ // both.
+ rewrittenPred->add(std::make_unique<EqualityMatchExpression>(
+ *collNameField, Value(collElem.str())));
+ }
+ return rewrittenPred;
+ }
+
+ // Otherwise, we are rewriting to a full namespace field. Convert the object's
+ // subfields into an exact match on the oplog field.
+ return std::make_unique<EqualityMatchExpression>(
+ nsField, Value(dbElem.str() + "." + collElem.str()));
+ }
+ case BSONType::String: {
+ // Handles case with field path, like '{"ns.coll": "coll"}'. There must be 2 parts
+ // to the field path, ie. 'ns' and '[db | coll]'.
+ if (predicate->fieldRef()->numParts() != 2) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // Extract the second field and verify that it is either 'db' or 'coll'.
+ auto fieldName = predicate->fieldRef()->getPart(1);
+ if (fieldName != "db" && fieldName != "coll") {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // If the predicate is on 'coll' but we only have a db, we will never match.
+ if (fieldName == "coll" && nsFieldIsCmdNs && !collNameField) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // If the predicate is on 'db' and 'nsFieldIsCmdNs' is set to true, match the $cmd
+ // namespace.
+ if (nsFieldIsCmdNs && fieldName == "db") {
+ return std::make_unique<EqualityMatchExpression>(nsField,
+ Value(nsElem.str() + ".$cmd"));
+ }
+ // If the predicate is on 'coll', match the 'collNameField' if we have one.
+ if (collNameField && fieldName == "coll") {
+ return std::make_unique<EqualityMatchExpression>(*collNameField,
+ Value(nsElem.str()));
+ }
+
+ // Otherwise, we are rewriting this predicate to operate on a field containing the
+ // full namespace. If the predicate is on 'db', match all collections in that DB. If
+ // the predicate is on 'coll', match that collection in all DBs.
+ auto nsRegex = [&]() {
+ if (fieldName == "db") {
+ return "^" +
+ DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) +
+ "\\." + DocumentSourceChangeStream::kRegexAllCollections;
+ }
+ return DocumentSourceChangeStream::kRegexAllDBs + "\\." +
+ DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) +
+ "$";
+ }();
+
+ return std::make_unique<RegexMatchExpression>(nsField, nsRegex, "");
+ }
+ case BSONType::RegEx: {
+ // Handles case with field path having regex, like '{"ns.db": /^db$/}'. There must
+ // be 2 parts to the field path, ie. 'ns' and '[db | coll]'.
+ if (predicate->fieldRef()->numParts() != 2) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // Extract the second field and verify that it either 'db' or 'coll'.
+ auto fieldName = predicate->fieldRef()->getPart(1);
+ if (fieldName != "db" && fieldName != "coll") {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // If the predicate is on 'coll' but we only have a db, we will never match.
+ if (fieldName == "coll" && nsFieldIsCmdNs && !collNameField) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ // Rather than attempting to rewrite the regex to apply to the oplog field, we will
+ // instead write an $expr to extract the dbName or collName from the oplog field,
+ // and apply the unmodified regex directly to it. First get a reference to the
+ // relevant field in the oplog entry.
+ std::string exprFieldRef = "'$" +
+ (fieldName == "db" ? nsField : (!nsFieldIsCmdNs ? nsField : *collNameField)) +
+ "'";
+
+ // Wrap the field in an expression to return MISSING if the field is not a string,
+ // since this expression may execute on CRUD oplog entries with clashing fieldnames.
+ // We will make this available to other expressions as the variable '$$oplogField'.
+ std::string exprOplogField = str::stream()
+ << "{$cond: {if: {$eq: [{$type: " << exprFieldRef
+ << "}, 'string']}, then: " << exprFieldRef << ", else: '$$REMOVE'}}";
+
+ // Now create an expression to extract the db or coll name from the oplog entry.
+ std::string exprDbOrCollName = [&]() -> std::string {
+ // If the query is on 'coll' and we have a collName field, use it as-is.
+ if (fieldName == "coll" && collNameField) {
+ return "'$$oplogField'";
+ }
+
+ // Otherwise, we need to split apart a full ns string. Find the separator.
+ // Return 0 if input is null in order to prevent throwing in $substrBytes.
+ std::string exprDotPos =
+ "{$ifNull: [{$indexOfBytes: ['$$oplogField', '.']}, 0]}";
+
+ // If the query is on 'db', return everything up to the separator.
+ if (fieldName == "db") {
+ return "{$substrBytes: ['$$oplogField', 0, " + exprDotPos + "]}";
+ }
+
+ // Otherwise, the query is on 'coll'. Return everything from (separator + 1)
+ // to the end of the string.
+ return str::stream() << "{$substrBytes: ['$$oplogField', {$add: [1, "
+ << exprDotPos << "]}, -1]}";
+ }();
+
+ // Convert the MatchExpression $regex into a $regexMatch on the corresponding field.
+ std::string exprRegexMatch = str::stream()
+ << "{$regexMatch: {input: " << exprDbOrCollName << ", regex: '"
+ << nsElem.regex() << "', options: '" << nsElem.regexFlags() << "'}}";
+
+ // Finally, wrap the regex in a $let which defines the '$$oplogField' variable.
+ std::string exprRewrittenPredicate = str::stream()
+ << "{$let: {vars: {oplogField: " << exprOplogField
+ << "}, in: " << exprRegexMatch << "}}";
+
+ // Return a new ExprMatchExpression with the rewritten $regexMatch.
+ return std::make_unique<ExprMatchExpression>(
+ BSON("" << fromjson(exprRewrittenPredicate)).firstElement(), expCtx);
+ }
+ default:
+ break;
+ }
+ return nullptr;
+ };
+
+ // It is only feasible to attempt to rewrite a limited set of predicates here.
+ switch (predicate->matchType()) {
+ case MatchExpression::EQ:
+ case MatchExpression::INTERNAL_EXPR_EQ: {
+ auto eqME = static_cast<const ComparisonMatchExpressionBase*>(predicate);
+ return getRewrittenNamespace(eqME->getData());
+ }
+ case MatchExpression::REGEX: {
+ // Create the BSON element from the regex match expression and return a rewritten match
+ // expression, if possible.
+ auto regME = static_cast<const RegexMatchExpression*>(predicate);
+ BSONObjBuilder regexBob;
+ regME->serializeToBSONTypeRegex(&regexBob);
+ return getRewrittenNamespace(regexBob.obj().firstElement());
+ }
+ case MatchExpression::MATCH_IN: {
+ auto inME = static_cast<const InMatchExpression*>(predicate);
+
+ // An empty '$in' should not match anything.
+ if (inME->getEqualities().empty() && inME->getRegexes().empty()) {
+ return std::make_unique<AlwaysFalseMatchExpression>();
+ }
+
+ auto rewrittenOr = std::make_unique<OrMatchExpression>();
+
+ // For each equality expression, add the rewritten sub-expression to the '$or'
+ // expression. Abandon the entire rewrite, if any of the rewrite fails.
+ for (const auto& elem : inME->getEqualities()) {
+ if (auto rewrittenExpr = getRewrittenNamespace(elem)) {
+ rewrittenOr->add(std::move(rewrittenExpr));
+ continue;
+ }
+ return nullptr;
+ }
+
+ // For each regex expression, add the rewritten sub-expression to the '$or' expression.
+ // Abandon the entire rewrite, if any of the rewrite fails.
+ for (const auto& regME : inME->getRegexes()) {
+ BSONObjBuilder regexBob;
+ regME->serializeToBSONTypeRegex(&regexBob);
+ if (auto rewrittenExpr = getRewrittenNamespace(regexBob.obj().firstElement())) {
+ rewrittenOr->add(std::move(rewrittenExpr));
+ continue;
+ }
+ return nullptr;
+ }
+ return rewrittenOr;
+ }
+ default:
+ break;
+ }
+
+ // If we have reached here, this is a predicate which we cannot rewrite.
+ return nullptr;
+}
+
+/**
+ * Rewrites filters on 'ns' in a format that can be applied directly to the oplog.
+ * Returns nullptr if the predicate cannot be rewritten.
+ */
+std::unique_ptr<MatchExpression> matchRewriteNs(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const PathMatchExpression* predicate,
+ bool allowInexact) {
+ // We should only ever see predicates on the 'ns' field.
+ tassert(5554101, "Unexpected empty path", !predicate->path().empty());
+ tassert(5554102,
+ str::stream() << "Unexpected predicate on " << predicate->path(),
+ predicate->fieldRef()->getPart(0) == DocumentSourceChangeStream::kNamespaceField);
+
+ //
+ // CRUD events
+ //
+
+ // CRUD ops are rewritten to the 'ns' field that contains a full namespace string.
+ auto crudNsRewrite = matchRewriteGenericNamespace(expCtx, predicate, "ns"_sd);
+
+ // If we can't rewrite this predicate for CRUD operations, then we don't expect to be able to
+ // rewrite it for any other operations either.
+ if (!crudNsRewrite) {
+ return nullptr;
+ }
+
+ // Create the final namespace filter for CRUD operations, i.e. {op: {$ne: 'c'}}.
+ auto crudNsFilter = std::make_unique<AndMatchExpression>();
+ crudNsFilter->add(
+ MatchExpressionParser::parseAndNormalize(fromjson("{op: {$ne: 'c'}}"), expCtx));
+ crudNsFilter->add(std::move(crudNsRewrite));
+
+ //
+ // Command events
+ //
+
+ // Group together all command event cases.
+ auto cmdCases = std::make_unique<OrMatchExpression>();
+
+ // The 'rename' event is rewritten to a field that contains the full namespace string.
+ auto renameNsRewrite = matchRewriteGenericNamespace(expCtx, predicate, "o.renameCollection"_sd);
+ tassert(5554103, "Unexpected rewrite failure", renameNsRewrite);
+ cmdCases->add(std::move(renameNsRewrite));
+
+ // The 'drop' event is rewritten to the cmdNs in 'ns' and the collection name in 'o.drop'.
+ auto dropNsRewrite = matchRewriteGenericNamespace(
+ expCtx, predicate, "ns"_sd, true /* nsFieldIsCmdNs */, "o.drop"_sd);
+ tassert(5554104, "Unexpected rewrite failure", dropNsRewrite);
+ cmdCases->add(std::move(dropNsRewrite));
+
+ // The 'dropDatabase' event is rewritten to the cmdNs in 'ns'. It does not have a collection
+ // field.
+ auto dropDbNsRewrite =
+ matchRewriteGenericNamespace(expCtx, predicate, "ns"_sd, true /* nsFieldIsCmdNs */);
+ tassert(5554105, "Unexpected rewrite failure", dropDbNsRewrite);
+ auto andDropDbNsRewrite = std::make_unique<AndMatchExpression>(std::move(dropDbNsRewrite));
+ andDropDbNsRewrite->add(std::make_unique<EqualityMatchExpression>("o.dropDatabase", Value(1)));
+ cmdCases->add(std::move(andDropDbNsRewrite));
+
+ // Create the final namespace filter for {op: 'c'} operations.
+ auto cmdNsFilter = std::make_unique<AndMatchExpression>();
+ cmdNsFilter->add(MatchExpressionParser::parseAndNormalize(fromjson("{op: 'c'}"), expCtx));
+ cmdNsFilter->add(std::move(cmdCases));
+
+ //
+ // Build final 'ns' filter
+ //
+
+ // Construct the final rewritten predicate from each of the rewrite categories.
+ auto rewrittenPredicate = std::make_unique<OrMatchExpression>();
+ rewrittenPredicate->add(std::move(crudNsFilter));
+ rewrittenPredicate->add(std::move(cmdNsFilter));
+
+ return rewrittenPredicate;
+}
+
// Map of fields names for which a simple rename is sufficient when rewriting.
StringMap<std::string> renameRegistry = {
{"clusterTime", "ts"}, {"lsid", "lsid"}, {"txnNumber", "txnNumber"}};
@@ -385,7 +722,8 @@ StringMap<std::string> renameRegistry = {
StringMap<MatchExpressionRewrite> matchRewriteRegistry = {
{"operationType", matchRewriteOperationType},
{"documentKey", matchRewriteDocumentKey},
- {"fullDocument", matchRewriteFullDocument}};
+ {"fullDocument", matchRewriteFullDocument},
+ {"ns", matchRewriteNs}};
// Map of field names to corresponding agg Expression rewrite functions.
StringMap<AggExpressionRewrite> exprRewriteRegistry = {{"operationType", exprRewriteOperationType}};
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 85eb5b8195d..496091eee1c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -61,19 +61,6 @@
#include "mongo/db/vector_clock.h"
namespace mongo {
-namespace {
-std::string regexEscape(StringData source) {
- std::string result = "";
- std::string escapes = "*+|()^?[]./\\$";
- for (const char& c : source) {
- if (escapes.find(c) != std::string::npos) {
- result.append("\\");
- }
- result += c;
- }
- return result;
-}
-} // namespace
using boost::intrusive_ptr;
using boost::optional;
@@ -144,11 +131,12 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
switch (type) {
case ChangeStreamType::kSingleCollection:
// Match the target namespace exactly.
- return "^" + regexEscape(nss.ns()) + "$";
+ return "^" + regexEscapeNsForChangeStream(nss.ns()) + "$";
case ChangeStreamType::kSingleDatabase:
// Match all namespaces that start with db name, followed by ".", then NOT followed by
// '$' or 'system.'
- return "^" + regexEscape(nss.db().toString()) + "\\." + kRegexAllCollections;
+ return "^" + regexEscapeNsForChangeStream(nss.db().toString()) + "\\." +
+ kRegexAllCollections;
case ChangeStreamType::kAllChangesForCluster:
// Match all namespaces that start with any db name other than admin, config, or local,
// followed by ".", then NOT followed by '$' or 'system.'.
@@ -163,7 +151,7 @@ std::string DocumentSourceChangeStream::getCollRegexForChangeStream(const Namesp
switch (type) {
case ChangeStreamType::kSingleCollection:
// Match the target collection exactly.
- return "^" + regexEscape(nss.coll()) + "$";
+ return "^" + regexEscapeNsForChangeStream(nss.coll()) + "$";
case ChangeStreamType::kSingleDatabase:
case ChangeStreamType::kAllChangesForCluster:
// Match any collection; database filtering will be done elsewhere.
@@ -179,7 +167,7 @@ std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(const Names
case ChangeStreamType::kSingleCollection:
case ChangeStreamType::kSingleDatabase:
// Match the target database command namespace exactly.
- return "^" + regexEscape(nss.getCommandNS().ns()) + "$";
+ return "^" + regexEscapeNsForChangeStream(nss.getCommandNS().ns()) + "$";
case ChangeStreamType::kAllChangesForCluster:
// Match all command namespaces on any database.
return kRegexAllDBs + "\\." + kRegexCmdColl;
@@ -188,6 +176,18 @@ std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(const Names
}
}
+std::string DocumentSourceChangeStream::regexEscapeNsForChangeStream(StringData source) {
+ std::string result = "";
+ std::string escapes = "*+|()^?[]./\\$";
+ for (const char& c : source) {
+ if (escapes.find(c) != std::string::npos) {
+ result.append("\\");
+ }
+ result += c;
+ }
+ return result;
+}
+
ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec(
const DocumentSourceChangeStreamSpec& spec) {
if (spec.getStartAfter()) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 5cf43b37988..3bd12f97a89 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -184,6 +184,8 @@ public:
static std::string getCollRegexForChangeStream(const NamespaceString& nss);
static std::string getCmdNsRegexForChangeStream(const NamespaceString& nss);
+ static std::string regexEscapeNsForChangeStream(StringData source);
+
/**
* Parses a $changeStream stage from 'elem' and produces the $match and transformation
* stages required.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index a0479604175..27e9b03097f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -3954,7 +3954,37 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAllStagesAndResumeT
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-using ChangeStreamRewriteTest = AggregationContextFixture;
+class ChangeStreamRewriteTest : public AggregationContextFixture {
+public:
+ std::string getNsDbRegexMatchExpr(const std::string& field, const std::string& regex) {
+ return str::stream()
+ << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field
+ << "']}, {$const: 'string'}]}, '" << field
+ << "', '$$REMOVE']}}, in: {$regexMatch: {input: {$substrBytes: ['$$oplogField', "
+ "{$const: 0}, {$ifNull: [{$indexOfBytes: ['$$oplogField', {$const: "
+ "'.'}]}, {$const: 0}]}]}, regex: {$const: '"
+ << regex << "'}, options: {$const: ''}}}}}}";
+ }
+
+ std::string getNsCollRegexMatchExpr(const std::string& field, const std::string& regex) {
+ if (field == "$o.drop") {
+ return str::stream()
+ << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field
+ << "']}, {$const: 'string'}]}, '" << field
+ << "', '$$REMOVE']}}, in: {$regexMatch: {input: '$$oplogField', regex: {$const: '"
+ << regex << "'}, options: {$const: ''}}}}}}";
+ }
+
+ return str::stream()
+ << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field
+ << "']}, {$const: 'string'}]}, '" << field
+ << "', '$$REMOVE']}}, in: {$regexMatch: {input: {$substrBytes: ['$$oplogField', {$add: "
+ "[{$const: 1}, "
+ "{$ifNull: [{$indexOfBytes: ['$$oplogField', {$const: '.'}]}, {$const: 0}]}]}, "
+ "{$const: -1}]}, regex: {$const: '"
+ << regex << "'}, options: {$const: ''}} }}}}";
+ }
+};
TEST_F(ChangeStreamRewriteTest, RewriteOrPredicateOnRenameableFields) {
auto spec = fromjson("{$or: [{clusterTime: {$type: [17]}}, {lsid: {$type: [16]}}]}");
@@ -4707,5 +4737,815 @@ TEST_F(ChangeStreamRewriteTest, CannotExactlyRewritePredicateOnFieldFullDocument
// rewritten predicate that matches exactly.
ASSERT(rewrittenMatchExpression == nullptr);
}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteFullNamespaceObject) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(
+ BSON("ns" << BSON("db" << expCtx->ns.db() << "coll" << expCtx->ns.coll())), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ const std::string ns = expCtx->ns.db().toString() + "." + expCtx->ns.coll().toString();
+ const std::string cmdNs = expCtx->ns.db().toString() + ".$cmd";
+
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), BSON("ns" << BSON("$eq" << ns)))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON("o.renameCollection" << BSON("$eq" << ns)),
+ BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)),
+ BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())))),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithSwappedField) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(
+ BSON("ns" << BSON("coll" << expCtx->ns.coll() << "db" << expCtx->ns.db())), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyDbField) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("ns" << BSON("db" << expCtx->ns.db())), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string cmdNs = expCtx->ns.db().toString() + ".$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1}"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyCollectionField) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("ns" << BSON("coll" << expCtx->ns.coll())), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidDbField) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(
+ BSON("ns" << BSON("db" << 1 << "coll" << expCtx->ns.coll())), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidCollField) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(
+ BSON("ns" << BSON("db" << expCtx->ns.db() << "coll" << 1)), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithExtraField) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(
+ fromjson("{ns: {db: 'db', coll: 'coll', extra: 'extra'}}"), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithStringDbFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("ns.db" << expCtx->ns.db()), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ const std::string regexNs = "^" + expCtx->ns.db().toString() + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string cmdNs = expCtx->ns.db().toString() + ".$cmd";
+
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON("ns" << BSON("$regex" << regexNs)))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)),
+ BSON("ns" << BSON("$eq" << cmdNs)),
+ BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithCollectionFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("ns.coll" << expCtx->ns.coll()), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ const std::string regexNs = DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." +
+ expCtx->ns.coll().toString() + "$";
+
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON("ns" << BSON("$regex" << regexNs)))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)),
+ BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())),
+ BSON(AND(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexDbFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("ns.db" << BSONRegEx(R"(^unit.*$)")), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^unit.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")),
+ BSON(AND(fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexCollectionFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression =
+ MatchExpressionParser::parse(BSON("ns.coll" << BSONRegEx(R"(^pipeline.*$)")), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ fromjson(getNsCollRegexMatchExpr("$ns", R"(^pipeline.*$)")))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
+ R"(^pipeline.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^pipeline.*$)")),
+ BSON(AND(fromjson("{ $alwaysFalse: 1 }"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInvalidDbFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.db" << 1), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInvalidCollectionFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.coll" << 1), expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraDbFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.db.subField"
+ << "subDb"),
+ expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraCollectionFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.coll.subField"
+ << "subColl"),
+ expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInvalidFieldPath) {
+ auto expCtx = getExpCtx();
+ auto statusWithMatchExpression = MatchExpressionParser::parse(BSON("ns.unknown"
+ << "test"),
+ expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ getExpCtx(), statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDb) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$in: ['test', 'news']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string firstRegexNs = std::string("^") + "news" + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string secondRegexNs = std::string("^") + "test" + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string firstCmdNs = "news.$cmd";
+ const std::string secondCmdNs = "test.$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << firstRegexNs)),
+ BSON("ns" << BSON("$regex" << secondRegexNs)))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)),
+ BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)))),
+ BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)),
+ BSON("ns" << BSON("$eq" << secondCmdNs)))),
+ BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)),
+ BSON("ns" << BSON("$eq" << secondCmdNs)))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDb) {
+ auto expCtx = getExpCtx();
+ auto expr = BSON("ns.db" << BSON("$nin" << BSON_ARRAY("test"
+ << "news")));
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string firstRegexNs = std::string("^") + "test" + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string secondRegexNs = std::string("^") + "news" + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string firstCmdNs = "test.$cmd";
+ const std::string secondCmdNs = "news.$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ BSON("ns" << BSON("$regex" << firstRegexNs)))))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)))),
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ BSON("ns" << BSON("$eq" << firstCmdNs)))),
+ BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ BSON("ns" << BSON("$eq" << firstCmdNs)))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollection) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$in: ['test', 'news']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string firstRegexNs =
+ DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "news" + "$";
+ const std::string secondRegexNs =
+ DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "test" + "$";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << firstRegexNs)),
+ BSON("ns" << BSON("$regex" << secondRegexNs)))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)),
+ BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)))),
+ BSON(OR(BSON("o.drop" << BSON("$eq"
+ << "news")),
+ BSON("o.drop" << BSON("$eq"
+ << "test")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnCollection) {
+ auto expCtx = getExpCtx();
+ auto expr = BSON("ns.coll" << BSON("$nin" << BSON_ARRAY("test"
+ << "news")));
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string firstRegexNs =
+ DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "test" + "$";
+ const std::string secondRegexNs =
+ DocumentSourceChangeStream::kRegexAllDBs.toString() + "\\." + "news" + "$";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ BSON("ns" << BSON("$regex" << firstRegexNs)))))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)))),
+ BSON(OR(BSON("o.drop" << BSON("$eq"
+ << "news")),
+ BSON("o.drop" << BSON("$eq"
+ << "test")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnDb) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$in: [/^test.*$/, /^news$/]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
+ BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnDb) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$nin: [/^test.*$/, /^news$/]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
+ BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
+ BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnCollection) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$in: [/^test.*$/, /^news$/]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$ns", R"(^news$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnCollection) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$nin: [/^test.*$/, /^news$/]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$ns", R"(^news$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))),
+ BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDbWithRegexAndString) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$in: [/^test.*$/, 'news']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string secondRegexNs = std::string("^") + "news" + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string secondCmdNs = "news.$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(
+ BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))),
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
+ BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDbWithRegexAndString) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$nin: [/^test.*$/, 'news']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+
+ const std::string secondRegexNs = std::string("^") + "news" + "\\." +
+ DocumentSourceChangeStream::kRegexAllCollections.toString();
+ const std::string secondCmdNs = "news.$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))))),
+ BSON(AND(
+ fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(
+ BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))),
+ BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
+ BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)),
+ fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollectionWithRegexAndString) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$in: [/^test.*$/, 'news']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string secondRegexNs =
+ DocumentSourceChangeStream::kRegexAllDBs + "\\." + "news" + "$";
+ const std::string secondCmdNs = "news.$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
+ R"(^test.*$)")))),
+ BSON(OR(BSON("o.drop" << BSON("$eq"
+ << "news")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest,
+ CanRewriteNamespaceWithNinExpressionOnCollectionWithRegexAndString) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$nin: [/^test.*$/, 'news']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ const std::string secondRegexNs =
+ DocumentSourceChangeStream::kRegexAllDBs + "\\." + "news" + "$";
+ const std::string secondCmdNs = "news.$cmd";
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(BSON(OR(
+ BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"),
+ BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)),
+ fromjson(getNsCollRegexMatchExpr("$o.renameCollection",
+ R"(^test.*$)")))),
+ BSON(OR(BSON("o.drop" << BSON("$eq"
+ << "news")),
+ fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))),
+ BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"),
+ fromjson("{$alwaysFalse: 1}"))),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInExpressionOnInvalidDb) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$in: ['test', 1]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithNinExpressionOnInvalidDb) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$nin: ['test', 1]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInExpressionOnInvalidCollection) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$in: ['coll', 1]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithNinExpressionOnInvalidCollection) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.coll': {$nin: ['coll', 1]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT_FALSE(rewrittenMatchExpression);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyInExpression) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$in: []}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))));
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyNinExpression) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{'ns.db': {$nin: []}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(
+ rewrittenPredicate,
+ BSON(NOR(
+ BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))),
+ BSON(AND(fromjson("{op: {$eq: 'c'}}"),
+ BSON(OR(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{$alwaysFalse: 1 }"),
+ BSON(AND(fromjson("{$alwaysFalse: 1 }"),
+ fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
+}
+
} // namespace
} // namespace mongo