summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-09-23 19:18:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-23 19:42:07 +0000
commita81410d0be7398b49b9c65db47d3760bcd77b954 (patch)
tree6cf6e596784ac82c8b9cc9958dad4e3bbad926b0
parent1482ec7b3af3440a06176939f8fc835ba6394fbc (diff)
downloadmongo-a81410d0be7398b49b9c65db47d3760bcd77b954.tar.gz
SERVER-59421 Add $expr rewrite for "ns" field.
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js5
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js612
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp101
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp180
4 files changed, 851 insertions, 47 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
index add6cd1b1bc..4a293da3290 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
@@ -15,6 +15,7 @@
"use strict";
load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
const dbName = "change_stream_match_pushdown_documentKey_rewrite";
const collName = "change_stream_match_pushdown_documentKey_rewrite";
@@ -94,6 +95,10 @@ assert.commandWorked(coll.deleteOne({_id: 3, shard: 1}));
// rewrites.
assert.commandWorked(
db.adminCommand({configureFailPoint: "disableMatchExpressionOptimization", mode: "alwaysOn"}));
+FixtureHelpers.runCommandOnEachPrimary({
+ db: db.getSiblingDB("admin"),
+ cmdObj: {configureFailPoint: "disableMatchExpressionOptimization", mode: "alwaysOn"}
+});
// Ensure that the '$match' on the 'insert', 'update', 'replace', and 'delete' operation types with
// various predicates are rewritten correctly.
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js
index 4972f1fa90a..f641a433f57 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js
@@ -14,9 +14,12 @@
"use strict";
load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
const dbName = "change_stream_match_pushdown_and_rewrite";
+const otherDbName = "other_db";
const collName = "coll1";
+const otherCollName = "other_coll";
const st = new ShardingTest({
shards: 2,
@@ -26,9 +29,13 @@ const st = new ShardingTest({
const mongosConn = st.s;
const db = mongosConn.getDB(dbName);
-// Create a sharded collection.
+// Create a sharded collection in the main test database.
const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */);
+// Create a sharded collection in the "other" database.
+const otherColl =
+ createShardedCollection(st, "_id" /* shardKey */, otherDbName, otherCollName, 2 /* splitAt */);
+
// A helper that opens a change stream on the whole cluster with the user supplied match expression
// 'userMatchExpr' and validates that:
// 1. for each shard, the events are seen in that order as specified in 'expectedResult'
@@ -84,6 +91,16 @@ function verifyOnWholeCluster(
assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard);
}
+// Enable a failpoint that will prevent $expr match expressions from generating $_internalExprEq
+// or similar expressions. This ensures that the following test-cases only exercise the $expr
+// rewrites.
+assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableMatchExpressionOptimization", mode: "alwaysOn"}));
+FixtureHelpers.runCommandOnEachPrimary({
+ db: db.getSiblingDB("admin"),
+ cmdObj: {configureFailPoint: "disableMatchExpressionOptimization", mode: "alwaysOn"}
+});
+
// Create some new collections to ensure that test cases has sufficient namespaces to verify
// that the namespace filtering is working correctly.
const coll2 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll2", 4 /* splitAt */);
@@ -131,6 +148,12 @@ assert.commandWorked(coll4.insert({_id: 11, renameCollection: true, drop: {}, dr
assert.commandWorked(
coll4.insert({_id: 12, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""}));
+// These events from unmonitored collection should not been seen unexpectedly.
+assert.commandWorked(
+ otherColl.insert({_id: 1, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(
+ otherColl.insert({_id: 2, renameCollection: true, drop: {}, dropDatabase: null}));
+
// This group of tests ensures that the '$match' on a particular namespace object only sees its
// documents and only required document(s) are returned at the oplog for each shard.
verifyOnWholeCluster(resumeAfterToken,
@@ -138,18 +161,34 @@ verifyOnWholeCluster(resumeAfterToken,
{coll1: {insert: [1, 2]}},
1 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName, coll: "coll1"}]}}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {ns: {db: dbName, coll: "coll2"}}},
{coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
2 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName, coll: "coll2"}]}}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {ns: {db: dbName, coll: "coll.coll3"}}},
{"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName, coll: "coll.coll3"}]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the namespace with only db component should not emit any document and
// the oplog should not return any documents.
verifyOnWholeCluster(
resumeAfterToken, {$match: {ns: {db: dbName}}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName}]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the namespace object with 'unknown' collection does not exists and the oplog cursor
// returns 0 document.
@@ -157,6 +196,10 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {ns: {db: dbName, coll: "unknown"}}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName, coll: "unknown"}]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the namespace object with flipped fields does not match with the namespace object and
// the oplog cursor returns 0 document.
@@ -164,6 +207,10 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {ns: {coll: "coll1", db: dbName}}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {coll: "coll1", db: dbName}]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the namespace object with extra fields does not match with the namespace object and
// the oplog cursor returns 0 document.
@@ -171,10 +218,19 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {ns: {db: dbName, coll: "coll1", extra: "extra"}}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName, coll: "unknown", extra: "extra"}]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the empty namespace object does not match with the namespace object and the oplog
// cursor returns 0 document.
verifyOnWholeCluster(resumeAfterToken, {$match: {ns: {}}}, {}, 0);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {}]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure the '$match' on namespace's db should return documents for all collection and oplog should
// return all documents for each shard.
@@ -187,6 +243,15 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.db", dbName]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
// These cases ensure that the '$match' on regex of namespace' db, should return documents for all
// collection and oplog should return all documents for each shard.
@@ -199,6 +264,16 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.db", regex: "^change_stream_match_pushdown.*$"}}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": /^(change_stream_match_pushdown.*$)/}},
{
@@ -208,6 +283,19 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match:
+ {$expr: {$regexMatch: {input: "$ns.db", regex: "(^change_stream_match_pushdown.*$)"}}}
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}},
{
@@ -217,6 +305,23 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $regexMatch:
+ {input: "$ns.db", regex: "^(Change_Stream_MATCH_PUSHDOWN.*$)", options: "i"}
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": /(^unknown$|^change_stream_match_pushdown.*$)/}},
{
@@ -226,6 +331,23 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $regexMatch:
+ {input: "$ns.db", regex: "(^unknown$|^change_stream_match_pushdown.*$)"}
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": /^unknown$|^change_stream_match_pushdown.*$/}},
{
@@ -235,16 +357,40 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $regexMatch: {input: "$ns.db", regex: "^unknown$|^change_stream_match_pushdown.*$"}
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on non-existing db should not return any document and oplog should not
// return any document for each shard.
verifyOnWholeCluster(
resumeAfterToken, {$match: {"ns.db": "unknown"}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.db", "unknown"]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on empty db should not return any document and oplog should not return
// any document for each shard.
verifyOnWholeCluster(
resumeAfterToken, {$match: {"ns.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.db", ""]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on sub field of db should not return any document and oplog should not
// return any document for each shard.
@@ -252,6 +398,10 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db.extra": dbName}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.db.extra", "unknown"]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// This group of tests ensures that the '$match' on collection field path should emit only the
// required documents and oplog should return only required document(s) for each shard.
@@ -260,13 +410,25 @@ verifyOnWholeCluster(resumeAfterToken,
{coll1: {insert: [1, 2]}},
1 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll", "coll1"]}}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": "coll2"}},
{coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
2 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll", "coll2"]}}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": "coll.coll3"}},
{"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll", "coll.coll3"]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
// This group of tests ensures that the '$match' on the regex of the collection field path should
// emit only the required documents and oplog should return only required document(s) for each
@@ -276,13 +438,25 @@ verifyOnWholeCluster(resumeAfterToken,
{coll1: {insert: [1, 2]}},
1 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^col.*1"}}}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": /^col.*2/}},
{coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
2 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^col.*2"}}}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": /^col.*3/}},
{"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^col.*3"}}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
// This group of tests ensures that the '$match' on the regex matching all collections should return
// documents from all collection and oplog should return all document for each shard.
@@ -296,6 +470,15 @@ verifyOnWholeCluster(resumeAfterToken,
},
8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^col.*"}}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": /^CoLL.*/i}},
{
coll1: {insert: [1, 2]},
@@ -304,6 +487,16 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^CoLL.*", options: "i"}}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the regex matching 3 collection should return documents from these
// collections and oplog should return required documents for each shard.
@@ -315,6 +508,15 @@ verifyOnWholeCluster(resumeAfterToken,
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}
},
5 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^col.*1$|^col.*2$|^col.*3$"}}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}
+ },
+ 5 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on the regex to exclude 'coll1', 'coll2' and 'coll4' should return only
// documents from 'coll.coll3' and oplog should return required documents for each shard.
@@ -322,6 +524,10 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": /^coll[^124]/}},
{"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^coll[^124]"}}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on non-existing collection should not return any document and oplog
// should not return any document for each shard.
@@ -329,11 +535,19 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": "unknown"}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll", "unknown"]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on empty collection should not return any document and oplog should not
// return any document for each shard.
verifyOnWholeCluster(
resumeAfterToken, {$match: {"ns.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll", ""]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that the '$match' on sub field of collection should not return any document and oplog
// should not return any document for each shard.
@@ -341,6 +555,10 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll.extra": "coll1"}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll.extra", "coll1"]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$in' on db should return all documents and oplog should return all documents for
// each shard.
@@ -353,9 +571,18 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$in: ["$ns.db", [dbName]]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
-// This group of tests ensures that '$in' on regex matching the db name should return all documents
-// and oplog should return all documents for each shard.
+// This group of tests ensures that '$in' and equivalent '$expr' expression on regex matching the db
+// name should return all documents and oplog should return all documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$in: [/^change_stream_match.*$/]}}},
{
@@ -365,6 +592,16 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$or: [{$regexMatch: {input: "$ns.db", regex: "^change_stream_match.*$"}}]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$in: [/^change_stream_MATCH.*$/i]}}},
{
@@ -374,11 +611,33 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$regexMatch: {input: "$ns.db", regex: "^change_stream_MATCH.*$", options: "i"}}
+ ]
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
// Ensure that an empty '$in' on db path should not match any collection and oplog should not return
// any document for each shard.
verifyOnWholeCluster(
resumeAfterToken, {$match: {"ns.db": {$in: []}}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$in: ["$ns.db", []]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents for
// each shard.
@@ -390,6 +649,18 @@ verifyOnWholeCluster(resumeAfterToken,
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
+ 9 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure tht '$expr' with mix of valid and invalid db names should return required documents at the
+// oplog for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$in: ["$ns.db", [dbName, 1]]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
8 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should
@@ -398,6 +669,19 @@ verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$in: ["unknown1", /^unknown2$/]}}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$eq: ["$ns.db", "unknown1"]},
+ {$regexMatch: {input: "$ns.db", regex: "^unknown2$"}}
+ ]
+ }
+ }
+ },
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$in' on multiple collections should return the required documents and oplog should
// return required documents for each shard.
@@ -408,9 +692,19 @@ verifyOnWholeCluster(
3 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(
resumeAfterToken,
+ {$match: {$expr: {$in: ["$ns", [{db: dbName, coll: "coll1"}, {db: dbName, coll: "coll2"}]]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
{$match: {"ns.coll": {$in: ["coll1", "coll2"]}}},
{coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$in: ["$ns.coll", ["coll1", "coll2"]]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$in' on regex of multiple collections should return the required documents and oplog
// should return required documents for each shard.
@@ -419,9 +713,23 @@ verifyOnWholeCluster(
{$match: {"ns.coll": {$in: [/^coll1$/, /^coll2$/]}}},
{coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$regexMatch: {input: "$ns.coll", regex: "^coll1$"}},
+ {$regexMatch: {input: "$ns.coll", regex: "^coll2$"}}
+ ]
+ }
+ }
+ },
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
-// This group of tests ensures that '$in' on regex of matching all collections should return all
-// documents and oplog should return all documents for each shard.
+// This group of tests ensures that '$in' and equivalent '$expr' expression on regex of matching all
+// collections should return all documents and oplog should return all documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$in: [/^coll.*$/]}}},
{
@@ -432,6 +740,23 @@ verifyOnWholeCluster(resumeAfterToken,
},
8 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$regexMatch: {input: "$ns.coll", regex: "^coll.*$"}},
+ ]
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$in: [/^COLL.*$/i]}}},
{
coll1: {insert: [1, 2]},
@@ -440,6 +765,24 @@ verifyOnWholeCluster(resumeAfterToken,
"coll4": {insert: [7, 8, 9, 10, 11, 12]}
},
8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$regexMatch: {input: "$ns.coll", regex: "^COLL.*$", options: "i"}},
+ ]
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
// Ensure that an empty '$in' should not match any collection and oplog should not return any
// document for each shard.
@@ -453,7 +796,14 @@ verifyOnWholeCluster(resumeAfterToken,
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$in: ["coll1", 1]}}},
{coll1: {insert: [1, 2]}},
- 8 /* expectedOplogRetDocsForEachShard */);
+ 9 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$expr' on '$in' with mix of valid and invalid collections should return only
+// required documents at oplog for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$in: ["$ns.coll", ["coll1", 1]]}}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$in' with mix of string and regex matching collections can be rewritten and oplog
// should return required documents for each shard.
@@ -464,87 +814,210 @@ verifyOnWholeCluster(resumeAfterToken,
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
},
3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$eq: ["$ns.coll", "coll1"]},
+ {$regexMatch: {input: "$ns.coll", regex: "^coll.*3$"}},
+ ]
+ }
+ }
+ },
+ {
+ coll1: {insert: [1, 2]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
-// Ensure that '$in' with mix of string and regex can be rewritten and oplog should return '0'
-// document for each shard.
+// Ensure that '$in' and equivalent '$expr' expression with mix of string and regex can be rewritten
+// and oplog should return '0' document for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$in: ["unknown1", /^unknown2$/]}}},
{},
0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$eq: ["$ns.coll", "unknown1"]},
+ {$regexMatch: {input: "$ns.coll", regex: "^unknown2$"}},
+ ]
+ }
+ }
+ },
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
-// This group of tests ensure that '$nin' on db path should return all documents and oplog should
-// return all documents for each shard.
+// This group of tests ensure that '$nin' and equivalent '$expr' expression on db path should return
+// all documents and oplog should return all documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$nin: []}}},
{
coll1: {insert: [1, 2]},
coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
- "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
},
- 8 /* expectedOplogRetDocsForEachShard */);
+ 9 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$nin: ["unknown"]}}},
{
coll1: {insert: [1, 2]},
coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
- "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
},
- 8 /* expectedOplogRetDocsForEachShard */);
+ 9 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$not: {$or: [{$eq: ["$ns.db", "unknown"]}]}}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 9 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$nin: [/^unknown$/]}}},
{
coll1: {insert: [1, 2]},
coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
- "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
},
- 8 /* expectedOplogRetDocsForEachShard */);
+ 9 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {$expr: {$not: {$or: [{$regexMatch: {input: "$ns.db", regex: "^unknown$"}}]}}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 9 /* expectedOplogRetDocsForEachShard */);
-// These group of tests ensure that '$nin' on matching db name should not return any documents and
-// oplog should return '0' documents for each shard.
+// These group of tests ensure that '$nin' and equivalent '$expr' expression on matching db name
+// should only return documents from unmonitored db and oplog should return only required documents
+// from unmonitored db.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$nin: [dbName]}}},
- {},
- 0 /* expectedOplogRetDocsForEachShard */);
+ {"other_coll": {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$not: {$or: [{$eq: ["$ns.db", dbName]}]}}}},
+ {"other_coll": {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/]}}},
- {},
- 0 /* expectedOplogRetDocsForEachShard */);
+ {"other_coll": {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $not: {
+ $or: [{
+ $regexMatch:
+ {input: "$ns.db", regex: "change_stream_match_pushdown_and_rewr.*"}
+ }]
+ }
+ }
+ }
+ },
+ {"other_coll": {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
-// Ensure that '$nin' on multiple collections should return the required documents and oplog should
-// return required documents for each shard.
+// Ensure that '$nin' and equivalent '$expr' expression on multiple collections should return the
+// required documents and oplog should return required documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$nin: ["coll1", "coll2", "coll4"]}}},
- {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
- 2 /* expectedOplogRetDocsForEachShard */);
+ {
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$not: {$in: ["$ns.coll", ["coll1", "coll2", "coll4"]]}}}},
+ {
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
-// Ensure that '$nin' on regex of multiple collections should return the required documents and
-// oplog should return required documents for each shard.
+// Ensure that '$nin' and equivalent '$expr' expression on regex of multiple collections should
+// return the required documents and oplog should return required documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$nin: [/^coll1$/, /^coll2$/, /^coll4$/]}}},
- {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
- 2 /* expectedOplogRetDocsForEachShard */);
+ {
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $not: {
+ $or: [
+ {$regexMatch: {input: "$ns.coll", regex: "^coll1$"}},
+ {$regexMatch: {input: "$ns.coll", regex: "^coll2$"}},
+ {$regexMatch: {input: "$ns.coll", regex: "^coll4$"}}
+ ]
+ }
+ }
+ }
+ },
+ {
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
-// Ensure that '$nin' on regex of matching all collections should not return any document and oplog
-// should return '0' documents for each shard.
+// Ensure that '$nin' and equivalent '$expr' expression on regex of matching all collections should
+// return documents from unmonitored db and oplog should also return documentss for unmonitored db
+// each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$nin: [/^coll.*$/, /^sys.*$/]}}},
- {},
- 0 /* expectedOplogRetDocsForEachShard */);
+ {"other_coll": {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $not: {
+ $or: [
+ {$regexMatch: {input: "$ns.coll", regex: "^coll.*$"}},
+ {$regexMatch: {input: "$ns.coll", regex: "^sys.*$"}}
+ ]
+ }
+ }
+ }
+ },
+ {"other_coll": {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
-// Ensure that an empty '$nin' should match all collections and oplog should return all documents
-// for each shard.
+// Ensure that an empty '$nin' and equivalent '$expr' expression should match all collections and
+// oplog should return all documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$nin: []}}},
{
coll1: {insert: [1, 2]},
coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
- "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
},
- 8 /* expectedOplogRetDocsForEachShard */);
+ 9 /* expectedOplogRetDocsForEachShard */);
// Ensure that '$nin' with invalid collection cannot be rewritten and oplog should return all
// documents for each shard.
@@ -553,16 +1026,50 @@ verifyOnWholeCluster(resumeAfterToken,
{
coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
- coll4: {insert: [7, 8, 9, 10, 11, 12]}
+ coll4: {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 9 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$expr' with mix of valid and invalid collection should return required documents at
+// the oplog for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {$expr: {$not: {$in: ["$ns.coll", ["coll1", 1]]}}}},
+ {
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ coll4: {insert: [7, 8, 9, 10, 11, 12]},
+ "other_coll": {insert: [1, 2]}
},
8 /* expectedOplogRetDocsForEachShard */);
-// Ensure that '$nin' with mix of string and regex can be rewritten and oplog should return required
-// documents for each shard.
+// Ensure that '$nin' and equivalent '$expr' expression with mix of string and regex can be
+// rewritten and oplog should return required documents for each shard.
verifyOnWholeCluster(resumeAfterToken,
{$match: {"ns.coll": {$nin: ["coll1", /^coll2$/, "coll4"]}}},
- {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
- 2 /* expectedOplogRetDocsForEachShard */);
+ {
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $not: {
+ $or: [
+ {$in: ["$ns.coll", ["coll1", "coll4"]]},
+ {$regexMatch: {input: "$ns.coll", regex: "^coll2$"}},
+ ]
+ }
+ }
+ }
+ },
+ {
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "other_coll": {insert: [1, 2]}
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
// At this stage, the coll2 has been renamed to 'newColl2' and coll3 has been dropped. The test from
// here will drop the database and ensure that the 'ns' filter when applied over the collection
@@ -583,19 +1090,38 @@ verifyOnWholeCluster(secondResumeAfterToken,
{coll1: {drop: ["coll1", "coll1"]}},
1 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName, coll: "coll1"}]}}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(secondResumeAfterToken,
{$match: {"ns.coll": "coll1"}},
{coll1: {drop: ["coll1", "coll1"]}},
1 /* expectedOplogRetDocsForEachShard */);
verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns.coll", "coll1"]}}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(secondResumeAfterToken,
{$match: {"ns.coll": /^col.*1/}},
{coll1: {drop: ["coll1", "coll1"]}},
1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {$expr: {$regexMatch: {input: "$ns.coll", regex: "^col.*1"}}}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+// Ensure that the '$ns' object containing only 'db' should see only the 'dropDatabase' event and
+// only the required documents gets returned at the oplog for each shard.
verifyOnWholeCluster(
secondResumeAfterToken,
{$match: {ns: {db: dbName}}},
{change_stream_match_pushdown_and_rewrite_and_rewrite: {dropDatabase: [dbName, dbName]}},
1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ secondResumeAfterToken,
+ {$match: {$expr: {$eq: ["$ns", {db: dbName}]}}},
+ {change_stream_match_pushdown_and_rewrite_and_rewrite: {dropDatabase: [dbName, dbName]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
st.stop();
})();
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index 8ba08a70623..07a81f04b8c 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -575,7 +575,7 @@ std::unique_ptr<MatchExpression> matchRewriteGenericNamespace(
// instead write an $expr to extract the dbName or collName from the oplog field,
// and apply the unmodified regex directly to it. First get a reference to the
// relevant field in the oplog entry.
- std::string exprFieldRef = "'$" +
+ const std::string exprFieldRef = "'$" +
(fieldName == "db" ? nsField : (!nsFieldIsCmdNs ? nsField : *collNameField)) +
"'";
@@ -610,12 +610,12 @@ std::unique_ptr<MatchExpression> matchRewriteGenericNamespace(
}();
// Convert the MatchExpression $regex into a $regexMatch on the corresponding field.
- std::string exprRegexMatch = str::stream()
+ const std::string exprRegexMatch = str::stream()
<< "{$regexMatch: {input: " << exprDbOrCollName << ", regex: '"
<< nsElem.regex() << "', options: '" << nsElem.regexFlags() << "'}}";
// Finally, wrap the regex in a $let which defines the '$$oplogField' variable.
- std::string exprRewrittenPredicate = str::stream()
+ const std::string exprRewrittenPredicate = str::stream()
<< "{$let: {vars: {oplogField: " << exprOplogField
<< "}, in: " << exprRegexMatch << "}}";
@@ -763,6 +763,98 @@ std::unique_ptr<MatchExpression> matchRewriteNs(
}
/**
+ * Attempt to rewrite a reference to the 'ns' field such that, when evaluated over an oplog
+ * document, it produces the expected change stream value for the field.
+ */
+boost::intrusive_ptr<Expression> exprRewriteNs(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ExpressionFieldPath* expr,
+ bool allowInexact) {
+ auto fieldPath = expr->getFieldPathWithoutCurrentPrefix();
+
+ // This function should only be called on the 'ns' field.
+ tassert(5942100,
+ str::stream() << "Unexpected field path" << fieldPath.fullPathWithPrefix(),
+ fieldPath.getFieldName(0) == DocumentSourceChangeStream::kNamespaceField);
+
+ // If the field path is not 'ns', 'ns.db' or 'ns.coll', it does not exist.
+ static const std::set<std::string> validSubFieldNames = {"ns.db", "ns.coll"};
+ if (fieldPath.getPathLength() > 1 && !validSubFieldNames.count(fieldPath.fullPath())) {
+ return ExpressionConstant::create(expCtx.get(), Value());
+ }
+
+ // Firstly, we can always extract the database name directly from the "ns" field. Create a $let
+ // expression which will make '$$dbName' available to all subsequent expressions. Note that we
+ // do not yet complete the 'in' part of the $let, since this depends on the exact fieldPath.
+ auto dbNameLetExpr =
+ "{$let: {vars: {dbName: {$substrBytes: ['$ns', 0, {$indexOfBytes: ['$ns', '.']}]}}, in: ";
+
+ // If the expression is on "ns.db", then we can simply complete and return the $let immediately.
+ if (fieldPath.getPathLength() == 2 && fieldPath.getFieldName(1) == "db") {
+ std::string rewrittenExpr = str::stream() << dbNameLetExpr << "'$$dbName' }}";
+ return Expression::parseExpression(
+ expCtx.get(), fromjson(rewrittenExpr), expCtx->variablesParseState);
+ }
+
+ // Otherwise, we need to compute the collection name for this event. We will build a $switch
+ // statement to do this. Create a vector to hold each of the cases.
+ std::vector<BSONObj> collCases;
+
+ // Helper function to extract the collection name from a given field, using the known $$dbName.
+ auto getCollFromNSField = [](StringData fieldName) -> std::string {
+ return str::stream() << "{$substrBytes: ['$" << fieldName
+ << "', {$add: [{$strLenBytes: '$$dbName'}, 1]}, -1]}";
+ };
+
+ /**
+ * NOTE: the list below MUST be kept up-to-date with any newly-added user-facing change stream
+ * opTypes that are derived from oplog events (as opposed to events which are generated by
+ * change stream stages themselves). Internal events of type {op: 'n'} are handled separately
+ * and do not need to be considered here.
+ */
+
+ // Cases for handling CRUD events.
+ collCases.push_back(fromjson(str::stream() << "{case: {$in: ['$op', ['i', 'u', 'd']]}, then: "
+ << getCollFromNSField("ns") << "}"));
+
+ // Cases for handling command events.
+ collCases.push_back(fromjson("{case: {$ne: ['$op', 'c']}, then: '$$REMOVE'}"));
+ collCases.push_back(fromjson("{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"));
+ collCases.push_back(
+ fromjson("{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"));
+ collCases.push_back(fromjson(str::stream()
+ << "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: "
+ << getCollFromNSField("o.renameCollection") << "}"));
+
+ // The default case, if nothing matches.
+ auto defaultCase = ExpressionConstant::create(expCtx.get(), Value())->serialize(false);
+
+ // Build the collection expression object...
+ BSONObjBuilder collExprBuilder;
+
+ BSONObjBuilder switchBuilder(collExprBuilder.subobjStart("$switch"));
+ switchBuilder.append("branches", collCases);
+ switchBuilder << "default" << defaultCase;
+ switchBuilder.doneFast();
+
+ auto collExpr = collExprBuilder.obj();
+
+ // Finally, wrap the expression in the $let which defines the '$$dbName' variable, and complete
+ // the 'in' parameter of the $let. If the length of the fieldPath is 1 then the field reference
+ // is '$ns' and we must construct the entire 'ns' object, with both 'db' and 'coll'. Otherwise,
+ // the field is '$ns.coll' and we can just return the 'collExpr' $switch we constructed above.
+ std::string rewrittenExpr = str::stream()
+ << dbNameLetExpr
+ << (fieldPath.getPathLength() == 1 ? "{db: '$$dbName', coll: " + collExpr.toString() + "}"
+ : collExpr.toString())
+ << "}}";
+
+ // Parse the expression BSON object into an Expression and return it.
+ return Expression::parseExpression(
+ expCtx.get(), fromjson(rewrittenExpr), expCtx->variablesParseState);
+}
+
+/**
* Rewrites filters on 'to' in a format that can be applied directly to the oplog.
* Returns nullptr if the predicate cannot be rewritten.
*/
@@ -799,7 +891,8 @@ StringMap<MatchExpressionRewrite> matchRewriteRegistry = {
// Map of field names to corresponding agg Expression rewrite functions.
StringMap<AggExpressionRewrite> exprRewriteRegistry = {{"operationType", exprRewriteOperationType},
- {"documentKey", exprRewriteDocumentKey}};
+ {"documentKey", exprRewriteDocumentKey},
+ {"ns", exprRewriteNs}};
// Traverse the Expression tree and rewrite as many of them as possible. Note that the rewrite is
// performed in-place; that is, the Expression passed into the function is mutated by it.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 67a832834c8..1088b8f8c4a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -5628,6 +5628,186 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyNinExpression) {
fromjson("{'o.dropDatabase': {$eq: 1}}"))))))))))));
}
+TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObject) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{$expr: {$eq: ['$ns', {db: '" + expCtx->ns.db() + "', coll: '" +
+ expCtx->ns.coll() + "'}]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto case1 =
+ "{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: "
+ "{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
+ "-1}]}}";
+ auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
+ auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
+ auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
+ auto case5 =
+ "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: "
+ "['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
+ "-1}]}}";
+
+ auto expectedExpr = fromjson(
+ "{"
+ " $expr: {"
+ " $eq: [{"
+ " $let: {"
+ " vars: {"
+ " dbName: {$substrBytes: ["
+ " '$ns', "
+ " {$const: 0}, "
+ " {$indexOfBytes: ['$ns', {$const: '.'}]}]}}, "
+ " in: {"
+ " db: '$$dbName',"
+ " coll: {"
+ " $switch: {"
+ " branches: ["s +
+ " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 +
+ " ], default: '$$REMOVE'}}}}}, "
+ " {db: {$const: 'unittests' }, coll: {$const: 'pipeline_test'}}]}"
+ "}");
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, expectedExpr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObjectWithOnlyDb) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{$expr: {$eq: ['$ns', {db: '" + expCtx->ns.db() + "'}]}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto case1 =
+ "{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: "
+ "{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
+ "-1}]}}";
+ auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
+ auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
+ auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
+ auto case5 =
+ "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: "
+ "['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
+ "-1}]}}";
+
+ auto expectedExpr = fromjson(
+ "{"
+ " $expr: {"
+ " $eq: [{"
+ " $let: {"
+ " vars: {"
+ " dbName: {$substrBytes: ["
+ " '$ns', "
+ " {$const: 0}, "
+ " {$indexOfBytes: ['$ns', {$const: '.'}]}]}}, "
+ " in: {"
+ " db: '$$dbName',"
+ " coll: {"
+ " $switch: {"
+ " branches: ["s +
+ " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 +
+ " ], default: '$$REMOVE'}}}}}, "
+ " {db: {$const: 'unittests' }}]}"
+ "}");
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, expectedExpr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnDbFieldPath) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{$expr: {$eq: ['$ns.db', '" + expCtx->ns.coll() + "']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto expectedExpr = fromjson(
+ "{"
+ " $expr: {"
+ " $eq: [{"
+ " $let: {"
+ " vars: {"
+ " dbName: {$substrBytes: ["
+ " '$ns', "
+ " {$const: 0}, "
+ " {$indexOfBytes: ['$ns', {$const: '.'}]}]}}, "
+ " in: '$$dbName' }},"
+ " {$const: 'pipeline_test'}]}"
+ "}");
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, expectedExpr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnCollFieldPath) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{$expr: {$eq: ['$ns.coll', '" + expCtx->ns.coll() + "']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto case1 =
+ "{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: "
+ "{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
+ "-1}]}}";
+ auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}";
+ auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}";
+ auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}";
+ auto case5 =
+ "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: "
+ "['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: "
+ "-1}]}}";
+
+ auto expectedExpr = fromjson(
+ "{"
+ " $expr: {"
+ " $eq: [{"
+ " $let: {"
+ " vars: {"
+ " dbName: {$substrBytes: ["
+ " '$ns', "
+ " {$const: 0}, "
+ " {$indexOfBytes: ['$ns', {$const: '.'}]}]}}, "
+ " in: {"
+ " $switch: {"
+ " branches: ["s +
+ " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 +
+ " ], default: '$$REMOVE'}}}}, "
+ " {$const: 'pipeline_test'}]}"
+ "}");
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate, expectedExpr);
+}
+
+TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnInvalidFieldPath) {
+ auto expCtx = getExpCtx();
+ auto expr = fromjson("{$expr: {$eq: ['$ns.test', '" + expCtx->ns.coll() + "']}}");
+ auto statusWithMatchExpression = MatchExpressionParser::parse(expr, expCtx);
+ ASSERT_OK(statusWithMatchExpression.getStatus());
+
+ auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields(
+ expCtx, statusWithMatchExpression.getValue().get(), {"ns"});
+ ASSERT(rewrittenMatchExpression);
+
+ auto rewrittenPredicate = rewrittenMatchExpression->serialize();
+ ASSERT_BSONOBJ_EQ(rewrittenPredicate,
+ fromjson("{$expr: {$eq: ['$$REMOVE', {$const: 'pipeline_test'}]}}"));
+}
+
TEST_F(ChangeStreamRewriteTest, CanRewriteFullToObject) {
auto expCtx = getExpCtx();
auto statusWithMatchExpression = MatchExpressionParser::parse(