summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-09-16 11:44:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-16 13:53:02 +0000
commitaa36306977363038be233195ac5645edb3d71dab (patch)
tree59f8a1e7c258f44e0985bcea48ff4dac7cfe6d4c /jstests/change_streams
parent9caae4e9c3cc4c0dc27967ad9e0f84284087a8de (diff)
downloadmongo-aa36306977363038be233195ac5645edb3d71dab.tar.gz
SERVER-59457 [Part2] Allow change stream rewrite tests to run in whole-db and whole-cluster passthrough
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js179
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js199
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js211
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js601
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js283
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js442
6 files changed, 1915 insertions, 0 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js
new file mode 100644
index 00000000000..76c4ed2d1f2
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js
@@ -0,0 +1,179 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the $match and
+// apply it to oplog-format documents in order to filter out results as early as possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_and_rewrite";
+const collName = "coll1";
+const collNameAlternate = "change_stream_match_pushdown_and_rewrite_alternate";
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+
+// Create a sharded collection.
+const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */);
+
+// Create a second (unsharded) test collection for validating transactions that insert into multiple
+// collections.
+assert.commandWorked(db.createCollection(collNameAlternate));
+
+const changeStream = coll.aggregate([{$changeStream: {}}, {$match: {operationType: "insert"}}]);
+
+// Store a resume token that can be used to start the change stream from the beginning.
+const resumeAfterToken = changeStream.getResumeToken();
+
+// These commands will result in 6 oplog events, with all but 2 will be filtered out by the
+// $match.
+assert.commandWorked(coll.insert({_id: 1, string: "Value"}));
+assert.commandWorked(coll.update({_id: 1}, {$set: {foo: "bar"}}));
+assert.commandWorked(coll.remove({_id: 1}));
+assert.commandWorked(coll.insert({_id: 2, string: "vAlue"}));
+assert.commandWorked(coll.update({_id: 2}, {$set: {foo: "bar"}}));
+assert.commandWorked(coll.remove({_id: 2}));
+
+// Verify correct operation of the change stream.
+assert.soon(() => changeStream.hasNext());
+const event1 = changeStream.next();
+assert.eq(event1.operationType, "insert", event1);
+assert.eq(event1.documentKey._id, 1, event1);
+
+assert.soon(() => changeStream.hasNext());
+const event2 = changeStream.next();
+assert.eq(event2.operationType, "insert", event2);
+assert.eq(event2.documentKey._id, 2, event2);
+
+assert(!changeStream.hasNext());
+changeStream.close();
+
+// Run the same change stream again, this time with "executionStats" to get more detailed
+// information about how many documents are processed by each stage. Note that 'event1' will not
+// be included in the results set this time, because we are using it as the resume point, but
+// will still be returned from the shard and get processed on the mongoS.
+const stats = coll.explain("executionStats").aggregate([
+ {$changeStream: {resumeAfter: event1._id}},
+ {$match: {operationType: "insert"}}
+]);
+
+// Verify the number of documents seen from each shard by the mongoS pipeline. Because we expect
+// the $match to be pushed down to the shards, we expect to only see the 1 "insert" operation on
+// each shard. All other operations should be filtered out on the shards.
+assertNumChangeStreamDocsReturnedFromShard(stats, st.rs0.name, 1);
+assertNumChangeStreamDocsReturnedFromShard(stats, st.rs1.name, 1);
+
+// Because it is possible to rewrite the {operationType: "insert"} predicate so that it applies
+// to the oplog entry, we expect the $match to get pushed all the way to the initial oplog
+// query. This query executes in an internal "$cursor" stage, and we expect to see exactly 1
+// document from this stage on each shard.
+assertNumMatchingOplogEventsForShard(stats, st.rs0.name, 1);
+assertNumMatchingOplogEventsForShard(stats, st.rs1.name, 1);
+
+// Generate another 7 oplog events, this time within a transaction. One of the events is in a
+// different collection, to validate that events from outside the watched namespace get filtered
+// out even when within a transaction.
+const session = st.s.startSession({causalConsistency: true});
+const sessionColl = session.getDatabase(dbName)[collName];
+const sessionCollAlternate = session.getDatabase(dbName)[collNameAlternate];
+
+session.startTransaction({readConcern: {level: "majority"}});
+
+assert.commandWorked(sessionColl.insert({_id: 1}));
+assert.commandWorked(sessionColl.update({_id: 1}, {$set: {foo: "bar"}}));
+assert.commandWorked(sessionColl.remove({_id: 1}));
+assert.commandWorked(sessionCollAlternate.insert({_id: "alt"}));
+assert.commandWorked(sessionColl.insert({_id: 2}));
+assert.commandWorked(sessionColl.update({_id: 2}, {$set: {foo: "bar"}}));
+assert.commandWorked(sessionColl.remove({_id: 2}));
+
+assert.commandWorked(session.commitTransaction_forTesting());
+
+// Repeat the change stream from before, using a resume token to pick up from where the previous
+// change stream left off. This change stream will only observe the 6 operations that occur in the
+// transaction and will filter out everything except the 2 inserts.
+const txnChangeStream = coll.aggregate(
+ [{$changeStream: {resumeAfter: event2._id}}, {$match: {operationType: "insert"}}]);
+
+// Verify correct operation of the change stream.
+assert.soon(() => txnChangeStream.hasNext());
+const event3 = txnChangeStream.next();
+assert.eq(event3.operationType, "insert", event3);
+assert.eq(event3.documentKey._id, 1, event3);
+
+assert.soon(() => txnChangeStream.hasNext());
+const event4 = txnChangeStream.next();
+assert.eq(event4.operationType, "insert", event4);
+assert.eq(event4.documentKey._id, 2, event4);
+
+assert(!txnChangeStream.hasNext());
+txnChangeStream.close();
+
+// Run explain on the change stream to get more detailed execution information.
+const txnStatsAfterEvent2 = coll.explain("executionStats").aggregate([
+ {$changeStream: {resumeAfter: event2._id}},
+ {$match: {operationType: "insert"}}
+]);
+
+// Verify the number of documents seen from each shard by the mongoS pipeline. As before, we expect
+// that everything except the inserts will be filtered on the shard, limiting the number of events
+// the mongoS needs to retrieve.
+assertNumChangeStreamDocsReturnedFromShard(txnStatsAfterEvent2, st.rs0.name, 1);
+
+// Note that the event we are resuming from is sent to the mongoS from shard 2, even though it gets
+// filtered out, which is why we see 2 events here.
+assertNumChangeStreamDocsReturnedFromShard(txnStatsAfterEvent2, st.rs1.name, 2);
+
+// Generate a second transaction.
+session.startTransaction({readConcern: {level: "majority"}});
+
+assert.commandWorked(sessionColl.insert({_id: 1, a: 1, string: "vaLue"}));
+assert.commandWorked(sessionColl.update({_id: 1}, {$set: {foo: "bar"}}));
+assert.commandWorked(sessionColl.remove({_id: 1}));
+assert.commandWorked(sessionColl.insert({_id: 2, a: 2, string: "valUe"}));
+assert.commandWorked(sessionColl.update({_id: 2}, {$set: {foo: "bar"}}));
+assert.commandWorked(sessionColl.remove({_id: 2}));
+
+assert.commandWorked(session.commitTransaction_forTesting());
+
+// This change stream targets transactions from this session but filters out the first transaction.
+const txnStatsAfterEvent1 = coll.explain("executionStats").aggregate([
+ {$changeStream: {resumeAfter: event1._id}},
+ {$match: {operationType: "insert", lsid: event3.lsid, txnNumber: {$ne: event3.txnNumber}}}
+]);
+
+// The "lsid" and "txnNumber" filters should get pushed all the way to the initial oplog query
+// in the $cursor stage, meaning that every oplog entry gets filtered out except the
+// 'commitTransaction' on each shard for the one transaction we select with our filter.
+assertNumMatchingOplogEventsForShard(txnStatsAfterEvent1, st.rs0.name, 1);
+assertNumMatchingOplogEventsForShard(txnStatsAfterEvent1, st.rs1.name, 1);
+
+// Ensure that optimization does not attempt to create a filter that disregards the collation.
+const collationChangeStream = coll.aggregate(
+ [{$changeStream: {resumeAfter: resumeAfterToken}}, {$match: {"fullDocument.string": "value"}}],
+ {collation: {locale: "en_US", strength: 2}});
+
+["Value", "vAlue", "vaLue", "valUe"].forEach(val => {
+ assert.soon(() => collationChangeStream.hasNext());
+ const fullDocumentEvent = collationChangeStream.next();
+ assert.eq(fullDocumentEvent.fullDocument.string, val, fullDocumentEvent);
+});
+
+assert(!collationChangeStream.hasNext());
+collationChangeStream.close();
+st.stop();
+})();
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
new file mode 100644
index 00000000000..add6cd1b1bc
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js
@@ -0,0 +1,199 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: <predicate>}] with a predicate
+// involving the 'documentKey' field can push down the $match and rewrite the $match and make it
+// part of the oplog cursor's filter in order to filter out results as early as possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_documentKey_rewrite";
+const collName = "change_stream_match_pushdown_documentKey_rewrite";
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+
+// Returns a newly created sharded collection, where shard key is 'shard'.
+const coll = createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */);
+
+// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and
+// validates that:
+// (1) for each shard, the events are seen in that order as specified in 'expectedOps'; and
+// (2) each shard returns the expected number of events; and
+// (3) the number of docs returned by the oplog cursor on each shard matches what we expect
+// as specified in 'expectedOplogCursorReturnedDocs'.
+function verifyOps(resumeAfterToken, userMatchExpr, expectedOps, expectedOplogCursorReturnedDocs) {
+ const cursor =
+ coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]);
+
+ let expectedChangeStreamDocsReturned = [0, 0];
+ for (const [op, id, shardId] of expectedOps) {
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+ assert.eq(event.operationType, op, event);
+ assert.eq(event.documentKey._id, id, event);
+ assert.eq(event.documentKey.shard, shardId, event);
+ if (shardId == 0 || shardId == 1) {
+ ++expectedChangeStreamDocsReturned[shardId];
+ }
+ }
+
+ assert(!cursor.hasNext());
+
+ // An 'executionStats' could only be captured for a non-invalidating stream.
+ const stats = coll.explain("executionStats")
+ .aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]);
+
+ assertNumChangeStreamDocsReturnedFromShard(
+ stats, st.rs0.name, expectedChangeStreamDocsReturned[0]);
+ assertNumChangeStreamDocsReturnedFromShard(
+ stats, st.rs1.name, expectedChangeStreamDocsReturned[1]);
+ assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogCursorReturnedDocs[0]);
+ assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogCursorReturnedDocs[1]);
+}
+
+// Open a change stream and store the resume token. This resume token will be used to replay the
+// stream after this point.
+const resumeAfterToken = coll.watch([]).getResumeToken();
+
+// These operations will create oplog events. The change stream will apply several filters on these
+// series of events and ensure that the '$match' expressions are rewritten correctly.
+assert.commandWorked(coll.insert({_id: 2, shard: 0}));
+assert.commandWorked(coll.insert({_id: 3, shard: 0, z: 4}));
+assert.commandWorked(coll.insert({_id: 2, shard: 1, z: 4}));
+assert.commandWorked(coll.insert({_id: 3, shard: 1}));
+assert.commandWorked(coll.update({_id: 2, shard: 0}, {$set: {foo: "a"}}));
+assert.commandWorked(coll.update({_id: 3, shard: 0}, {$set: {foo: "a"}}));
+assert.commandWorked(coll.update({_id: 2, shard: 1}, {$set: {foo: "a"}}));
+assert.commandWorked(coll.update({_id: 3, shard: 1}, {$set: {foo: "a"}}));
+assert.commandWorked(coll.replaceOne({_id: 2, shard: 0}, {_id: 2, shard: 0, foo: "b"}));
+assert.commandWorked(coll.replaceOne({_id: 3, shard: 0}, {_id: 3, shard: 0, z: 4, foo: "b"}));
+assert.commandWorked(coll.replaceOne({_id: 2, shard: 1}, {_id: 2, shard: 1, z: 4, foo: "b"}));
+assert.commandWorked(coll.replaceOne({_id: 3, shard: 1}, {_id: 3, shard: 1, foo: "b"}));
+assert.commandWorked(coll.deleteOne({_id: 2, shard: 0}));
+assert.commandWorked(coll.deleteOne({_id: 3, shard: 0}));
+assert.commandWorked(coll.deleteOne({_id: 2, shard: 1}));
+assert.commandWorked(coll.deleteOne({_id: 3, shard: 1}));
+
+// Enable a failpoint that will prevent $expr match expressions from generating $_internalExprEq
+// or similar expressions. This ensures that the following test-cases only exercise the $expr
+// rewrites.
+assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableMatchExpressionOptimization", mode: "alwaysOn"}));
+
+// Ensure that the '$match' on the 'insert', 'update', 'replace', and 'delete' operation types with
+// various predicates are rewritten correctly.
+for (const op of ["insert", "update", "replace", "delete"]) {
+ // Test out a predicate on the full 'documentKey' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, documentKey: {shard: 0, _id: 2}}},
+ [[op, 2, 0]],
+ [1, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a predicate on 'documentKey._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey._id": 2}},
+ [[op, 2, 0], [op, 2, 1]],
+ [1, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a predicate on 'documentKey.shard'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey.shard": 1}},
+ [[op, 2, 1], [op, 3, 1]],
+ [0, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on the full 'documentKey' field. It's not possible to rewrite
+ // this predicate and make it part of the oplog filter, so we expect the oplog cursor to return
+ // 2 docs on each shard.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, documentKey: {$not: {$eq: {shard: 0, _id: 2}}}}},
+ [[op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on 'documentKey._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey._id": {$not: {$eq: 2}}}},
+ [[op, 3, 0], [op, 3, 1]],
+ [1, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on 'documentKey.shard'. It's not possible to rewrite this
+ // predicate and make it part of the oplog filter, so we expect the oplog cursor to return 2
+ // docs on each shard.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey.shard": {$not: {$eq: 1}}}},
+ [[op, 2, 0], [op, 3, 0]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$exists: false}' predicate on a field that doesn't exist in 'documentKey' but
+ // that does exist in some of the underlying documents.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "documentKey.z": {$exists: false}}},
+ [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $expr predicate on the full 'documentKey' field.
+ verifyOps(
+ resumeAfterToken,
+ {
+ $match:
+ {$and: [{operationType: op}, {$expr: {$eq: ["$documentKey", {shard: 0, _id: 2}]}}]}
+ },
+ [[op, 2, 0]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on the full 'documentKey' field.
+ verifyOps(resumeAfterToken,
+ {
+ $match: {
+ $and: [
+ {operationType: op},
+ {$expr: {$not: {$eq: ["$documentKey", {shard: 0, _id: 2}]}}}
+ ]
+ }
+ },
+ [[op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $expr predicate on 'documentKey._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {$and: [{operationType: op}, {$expr: {$eq: ["$documentKey._id", 2]}}]}},
+ [[op, 2, 0], [op, 2, 1]],
+ [1, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $expr predicate on 'documentKey._id'.
+ verifyOps(
+ resumeAfterToken,
+ {$match: {$and: [{operationType: op}, {$expr: {$not: {$eq: ["$documentKey._id", 2]}}}]}},
+ [[op, 3, 0], [op, 3, 1]],
+ [1, 1] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out an $expr predicate on 'documentKey.shard'.
+ verifyOps(resumeAfterToken,
+ {$match: {$and: [{operationType: op}, {$expr: {$eq: ["$documentKey.shard", 1]}}]}},
+ [[op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated $expr predicate on 'documentKey.shard'.
+ verifyOps(
+ resumeAfterToken,
+ {$match: {$and: [{operationType: op}, {$expr: {$not: {$eq: ["$documentKey.shard", 1]}}}]}},
+ [[op, 2, 0], [op, 3, 0]],
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+}
+
+st.stop();
+})();
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js
new file mode 100644
index 00000000000..35986a01d95
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js
@@ -0,0 +1,211 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: <predicate>}] with a predicate
+// involving the 'fullDocument' field can push down the $match and rewrite the $match and make it
+// part of the oplog cursor's filter in order to filter out results as early as possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_fullDocument_rewrite";
+const collName = "change_stream_match_pushdown_fullDocument_rewrite";
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+// Create a sharded collection where shard key is 'shard'.
+const coll = createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */);
+
+// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and
+// validates that:
+// (1) for each shard, the events are seen in that order as specified in 'expectedOps'; and
+// (2) the number of docs returned by each shard matches what we expect as specified by
+// 'expectedChangeStreamDocsReturned'; and
+// (3) the number of docs returned by the oplog cursor on each shard matches what we expect as
+// specified in 'expectedOplogCursorReturnedDocs'.
+function verifyOps(resumeAfterToken,
+ userMatchExpr,
+ expectedOps,
+ expectedChangeStreamDocsReturned,
+ expectedOplogCursorReturnedDocs) {
+ const cursor = coll.aggregate([
+ {$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}},
+ userMatchExpr
+ ]);
+
+ for (const [op, id, shardId] of expectedOps) {
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+ assert.eq(event.operationType, op, event);
+ if (id !== undefined) {
+ assert.eq(event.fullDocument._id, id, event);
+ }
+ if (shardId !== undefined) {
+ assert.eq(event.fullDocument.shard, shardId, event);
+ }
+ }
+
+ assert(!cursor.hasNext());
+
+ // An 'executionStats' could only be captured for a non-invalidating stream.
+ const stats = coll.explain("executionStats").aggregate([
+ {$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}},
+ userMatchExpr
+ ]);
+
+ assertNumChangeStreamDocsReturnedFromShard(
+ stats, st.rs0.name, expectedChangeStreamDocsReturned[0]);
+ assertNumChangeStreamDocsReturnedFromShard(
+ stats, st.rs1.name, expectedChangeStreamDocsReturned[1]);
+ assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogCursorReturnedDocs[0]);
+ assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogCursorReturnedDocs[1]);
+}
+
+// Open a change stream and store the resume token. This resume token will be used to replay the
+// stream after this point.
+const resumeAfterToken = coll.watch([]).getResumeToken();
+
+// These operations will create oplog events. The change stream will apply several filters on these
+// series of events and ensure that the '$match' expressions are rewritten correctly.
+assert.commandWorked(coll.insert({_id: 2, shard: 0}));
+assert.commandWorked(coll.insert({_id: 3, shard: 0}));
+assert.commandWorked(coll.insert({_id: 2, shard: 1}));
+assert.commandWorked(coll.insert({_id: 3, shard: 1}));
+assert.commandWorked(coll.replaceOne({_id: 2, shard: 0}, {_id: 2, shard: 0, foo: "a"}));
+assert.commandWorked(coll.replaceOne({_id: 3, shard: 0}, {_id: 3, shard: 0, foo: "a"}));
+assert.commandWorked(coll.replaceOne({_id: 2, shard: 1}, {_id: 2, shard: 1, foo: "a"}));
+assert.commandWorked(coll.replaceOne({_id: 3, shard: 1}, {_id: 3, shard: 1, foo: "a"}));
+assert.commandWorked(coll.update({_id: 2, shard: 0}, {$set: {foo: "b"}}));
+assert.commandWorked(coll.update({_id: 3, shard: 0}, {$set: {foo: "b"}}));
+assert.commandWorked(coll.update({_id: 2, shard: 1}, {$set: {foo: "b"}}));
+assert.commandWorked(coll.update({_id: 3, shard: 1}, {$set: {foo: "b"}}));
+
+// This helper will execute 'delete' operations to create oplog events. We defer executing the
+// 'delete' operations so that the 'update' operations may look up the relevant document in the
+// collection.
+const runDeleteOps = () => {
+ assert.commandWorked(coll.deleteOne({_id: 2, shard: 0}));
+ assert.commandWorked(coll.deleteOne({_id: 3, shard: 0}));
+ assert.commandWorked(coll.deleteOne({_id: 2, shard: 1}));
+ assert.commandWorked(coll.deleteOne({_id: 3, shard: 1}));
+};
+
+// This helper takes an operation 'op' and calls verifyOps() multiple times with 'op' to exercise
+// several different testcases.
+const runVerifyOpsTestcases = (op) => {
+ // 'delete' operations don't have a 'fullDocument' field, so we handle them as a special case.
+ if (op == "delete") {
+ // Test out the '{$exists: true}' predicate on the full 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$exists: true}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [0, 0] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$exists: false}' predicate on the full 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$exists: false}}},
+ [[op], [op], [op], [op]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ return;
+ }
+
+ // Initialize 'doc' so that it matches the 'fullDocument' field of one of the events where
+ // operationType == 'op'. For all 'insert' events, 'fullDocument' only has the '_id' field and
+ // the 'shard' field. For 'replace' and 'update' events, 'fullDocument' also has a 'foo' field.
+ const doc = {_id: 2, shard: 0};
+ if (op != "insert") {
+ doc.foo = (op == "replace" ? "a" : "b");
+ }
+
+ // Note: for operations of type 'update', the 'fullDocument' field is not populated until midway
+ // through the pipeline. We therefore cannot rewrite predicates on 'fullDocument' into the oplog
+ // for this operation type. As a result, the tests below verify that the number of documents
+ // returned by the oplog scan are different for updates than for other event types.
+
+ // Test out a predicate on the full 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: doc}},
+ [[op, 2, 0]],
+ [1, 0] /* expectedChangeStreamDocsReturned */,
+ op != "update" ? [1, 0] : [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a predicate on 'fullDocument._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "fullDocument._id": {$lt: 3}}},
+ [[op, 2, 0], [op, 2, 1]],
+ [1, 1] /* expectedChangeStreamDocsReturned */,
+ op != "update" ? [1, 1] : [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a predicate on 'fullDocument.shard'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "fullDocument.shard": {$gt: 0}}},
+ [[op, 2, 1], [op, 3, 1]],
+ [0, 2] /* expectedChangeStreamDocsReturned */,
+ op != "update" ? [0, 2] : [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on the full 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$not: {$eq: doc}}}},
+ [[op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [1, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on 'fullDocument._id'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "fullDocument._id": {$not: {$lt: 3}}}},
+ [[op, 3, 0], [op, 3, 1]],
+ [1, 1] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out a negated predicate on 'fullDocument.shard'.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, "fullDocument.shard": {$not: {$gt: 0}}}},
+ [[op, 2, 0], [op, 3, 0]],
+ [2, 0] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$exists: true}' predicate on the full 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$exists: true}}},
+ [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]],
+ [2, 2] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+
+ // Test out the '{$exists: false}' predicate on the full 'fullDocument' field.
+ verifyOps(resumeAfterToken,
+ {$match: {operationType: op, fullDocument: {$exists: false}}},
+ [],
+ [0, 0] /* expectedChangeStreamDocsReturned */,
+ [2, 2] /* expectedOplogCursorReturnedDocs */);
+};
+
+// Verify '$match's on the 'update' operation type with various predicates get rewritten correctly.
+// We intentionally do this before executing the 'delete' operations so that post-image lookup will
+// be successful.
+runVerifyOpsTestcases("update");
+
+// Now that we're done verifying 'update' events, we can execute the 'delete' operations.
+runDeleteOps();
+
+// Ensure that '$match' on 'insert', 'replace', and 'delete' operation types with various predicates
+// are rewritten correctly.
+runVerifyOpsTestcases("insert");
+runVerifyOpsTestcases("replace");
+runVerifyOpsTestcases("delete");
+
+st.stop();
+})();
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js
new file mode 100644
index 00000000000..4972f1fa90a
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js
@@ -0,0 +1,601 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the 'namespace'
+// and apply it to oplog-format documents in order to filter out results as early as possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_and_rewrite";
+const collName = "coll1";
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+
+// Create a sharded collection.
+const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */);
+
+// A helper that opens a change stream on the whole cluster with the user supplied match expression
+// 'userMatchExpr' and validates that:
+// 1. for each shard, the events are seen in that order as specified in 'expectedResult'
+// 2. the filtering is been done at oplog level
+function verifyOnWholeCluster(
+ resumeAfterToken, userMatchExpr, expectedResult, expectedOplogRetDocsForEachShard) {
+ const cursor = db.getSiblingDB("admin").aggregate([
+ {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}},
+ userMatchExpr
+ ]);
+
+ for (const [collOrDb, opDict] of Object.entries(expectedResult)) {
+ for (const [op, eventIdentifierList] of Object.entries(opDict)) {
+ eventIdentifierList.forEach(eventIdentifier => {
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+ assert.eq(event.operationType, op, event);
+
+ if (op == "dropDatabase") {
+ assert.eq(event.ns.db, eventIdentifier, event);
+ } else if (op == "insert") {
+ assert.eq(event.documentKey._id, eventIdentifier, event);
+ } else if (op == "rename") {
+ assert.eq(event.to.coll, eventIdentifier, event);
+ } else if (op == "drop") {
+ assert.eq(event.ns.coll, eventIdentifier);
+ } else {
+ assert(false, event);
+ }
+
+ if (op != "dropDatabase") {
+ assert.eq(event.ns.coll, collOrDb);
+ }
+ });
+ }
+ }
+
+ assert(!cursor.hasNext());
+
+ const stats = db.getSiblingDB("admin").runCommand({
+ explain: {
+ aggregate: 1,
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}},
+ userMatchExpr
+ ],
+ cursor: {batchSize: 0}
+ },
+ verbosity: "executionStats"
+ });
+
+ assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogRetDocsForEachShard);
+ assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard);
+}
+
+// Create some new collections to ensure that test cases has sufficient namespaces to verify
+// that the namespace filtering is working correctly.
+const coll2 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll2", 4 /* splitAt */);
+const coll3 =
+ createShardedCollection(st, "_id" /* shardKey */, dbName, "coll.coll3", 6 /* splitAt */);
+const coll4 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll4", 10 /* splitAt */);
+
+// Open a change stream and store the resume token. This resume token will be used to replay the
+// stream after this point.
+const resumeAfterToken =
+ db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken();
+
+// For each collection, insert 2 documents, one on each shard. These will create oplog events and
+// change stream will apply various namespace filtering on these collections to verify that the
+// namespace is rewritten correctly. Each documents also contains field names matching with that of
+// '$cmd' operations, ie. 'renameCollection', 'drop' and 'dropDatabase', but with value-type other
+// than strings. The 'ns' match filters should gracefully handle the type mismatch and not throw any
+// error.
+//
+// Each of these inserted documents will be represented in this form in the oplog:
+// {... "o": {"_id": <id>, "renameCollection": true, "drop": {}, "dropDatabase": null}, ...}
+//
+// A few collections are renamed and dropped to verify that these are filtered properly.
+assert.commandWorked(coll.insert({_id: 1, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(coll.insert({_id: 2, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(coll2.insert({_id: 3, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(coll2.insert({_id: 4, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(coll2.renameCollection("newColl2"));
+assert.commandWorked(coll3.insert({_id: 5, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(coll3.insert({_id: 6, renameCollection: true, drop: {}, dropDatabase: null}));
+assert(coll3.drop());
+
+// Insert some documents into 'coll4' with field names which match known command types. Despite the
+// fact that these documents could potentially match with the partial 'ns' filter we rewrite into
+// the oplog, the {op: "c"} predicate we add into the filter should ensure that they are correctly
+// discarded.
+assert.commandWorked(coll4.insert(
+ {_id: 7, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1}));
+assert.commandWorked(coll4.insert({_id: 8, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(
+ coll4.insert({_id: 9, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""}));
+assert.commandWorked(coll4.insert(
+ {_id: 10, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1}));
+assert.commandWorked(coll4.insert({_id: 11, renameCollection: true, drop: {}, dropDatabase: null}));
+assert.commandWorked(
+ coll4.insert({_id: 12, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""}));
+
+// This group of tests ensures that the '$match' on a particular namespace object only sees its
+// documents and only required document(s) are returned at the oplog for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll1"}}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll2"}}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll.coll3"}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the namespace with only db component should not emit any document and
+// the oplog should not return any documents.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {ns: {db: dbName}}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the namespace object with 'unknown' collection does not exists and the oplog cursor
+// returns 0 document.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "unknown"}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the namespace object with flipped fields does not match with the namespace object and
+// the oplog cursor returns 0 document.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {coll: "coll1", db: dbName}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the namespace object with extra fields does not match with the namespace object and
+// the oplog cursor returns 0 document.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll1", extra: "extra"}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the empty namespace object does not match with the namespace object and the oplog
+// cursor returns 0 document.
+verifyOnWholeCluster(resumeAfterToken, {$match: {ns: {}}}, {}, 0);
+
+// Ensure the '$match' on namespace's db should return documents for all collection and oplog should
+// return all documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": dbName}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// These cases ensure that the '$match' on regex of namespace' db, should return documents for all
+// collection and oplog should return all documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^change_stream_match_pushdown.*$/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^(change_stream_match_pushdown.*$)/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /(^unknown$|^change_stream_match_pushdown.*$)/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": /^unknown$|^change_stream_match_pushdown.*$/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on non-existing db should not return any document and oplog should not
+// return any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"ns.db": "unknown"}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on empty db should not return any document and oplog should not return
+// any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"ns.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on sub field of db should not return any document and oplog should not
+// return any document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db.extra": dbName}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// This group of tests ensures that the '$match' on collection field path should emit only the
+// required documents and oplog should return only required document(s) for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "coll1"}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "coll2"}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "coll.coll3"}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// This group of tests ensures that the '$match' on the regex of the collection field path should
+// emit only the required documents and oplog should return only required document(s) for each
+// shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*1/}},
+ {coll1: {insert: [1, 2]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*2/}},
+ {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*3/}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// This group of tests ensures that the '$match' on the regex matching all collections should return
+// documents from all collection and oplog should return all document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^CoLL.*/i}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the regex matching 3 collection should return documents from these
+// collections and oplog should return required documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^col.*1$|^col.*2$|^col.*3$/}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}
+ },
+ 5 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the regex to exclude 'coll1', 'coll2' and 'coll4' should return only
+// documents from 'coll.coll3' and oplog should return required documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": /^coll[^124]/}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on non-existing collection should not return any document and oplog
+// should not return any document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": "unknown"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on empty collection should not return any document and oplog should not
+// return any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"ns.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on sub field of collection should not return any document and oplog
+// should not return any document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll.extra": "coll1"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' on db should return all documents and oplog should return all documents for
+// each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [dbName]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// This group of tests ensures that '$in' on regex matching the db name should return all documents
+// and oplog should return all documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [/^change_stream_match.*$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [/^change_stream_MATCH.*$/i]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that an empty '$in' on db path should not match any collection and oplog should not return
+// any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"ns.db": {$in: []}}}, {}, 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents for
+// each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: [dbName, 1]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should
+// return '0' document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$in: ["unknown1", /^unknown2$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' on multiple collections should return the required documents and oplog should
+// return required documents for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"ns": {$in: [{db: dbName, coll: "coll1"}, {db: dbName, coll: "coll2"}]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["coll1", "coll2"]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' on regex of multiple collections should return the required documents and oplog
+// should return required documents for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"ns.coll": {$in: [/^coll1$/, /^coll2$/]}}},
+ {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}},
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+// This group of tests ensures that '$in' on regex of matching all collections should return all
+// documents and oplog should return all documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: [/^coll.*$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: [/^COLL.*$/i]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that an empty '$in' should not match any collection and oplog should not return any
+// document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: []}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' with invalid collection cannot be rewritten and oplog should return all
+// documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["coll1", 1]}}},
+ {coll1: {insert: [1, 2]}},
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' with mix of string and regex matching collections can be rewritten and oplog
+// should return required documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["coll1", /^coll.*3$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ },
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$in' with mix of string and regex can be rewritten and oplog should return '0'
+// document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$in: ["unknown1", /^unknown2$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// This group of tests ensure that '$nin' on db path should return all documents and oplog should
+// return all documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: []}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: ["unknown"]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: [/^unknown$/]}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// These group of tests ensure that '$nin' on matching db name should not return any documents and
+// oplog should return '0' documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: [dbName]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$nin' on multiple collections should return the required documents and oplog should
+// return required documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: ["coll1", "coll2", "coll4"]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$nin' on regex of multiple collections should return the required documents and
+// oplog should return required documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: [/^coll1$/, /^coll2$/, /^coll4$/]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$nin' on regex of matching all collections should not return any document and oplog
+// should return '0' documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: [/^coll.*$/, /^sys.*$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that an empty '$nin' should match all collections and oplog should return all documents
+// for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: []}}},
+ {
+ coll1: {insert: [1, 2]},
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ "coll4": {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$nin' with invalid collection cannot be rewritten and oplog should return all
+// documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: ["coll1", 1]}}},
+ {
+ coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]},
+ "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]},
+ coll4: {insert: [7, 8, 9, 10, 11, 12]}
+ },
+ 8 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that '$nin' with mix of string and regex can be rewritten and oplog should return required
+// documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"ns.coll": {$nin: ["coll1", /^coll2$/, "coll4"]}}},
+ {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}},
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// At this stage, the coll2 has been renamed to 'newColl2' and coll3 has been dropped. The test from
+// here will drop the database and ensure that the 'ns' filter when applied over the collection
+// should only emit the 'drop' event for that collection and not the 'dropDatabase' event. It should
+// be noted that for 'newColl2' and 'coll3', the 'dropDatabase' will be no-op and will not emit any
+// documents.
+
+// Open a new change streams and verify that from here onwards the events related to 'dropDatabase'
+// are seen.
+const secondResumeAfterToken =
+ db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken();
+
+assert.commandWorked(db.dropDatabase());
+
+// This group of tests ensures that the match on 'coll1' only sees the 'drop' events.
+verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {ns: {db: dbName, coll: "coll1"}}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {"ns.coll": "coll1"}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+verifyOnWholeCluster(secondResumeAfterToken,
+ {$match: {"ns.coll": /^col.*1/}},
+ {coll1: {drop: ["coll1", "coll1"]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+verifyOnWholeCluster(
+ secondResumeAfterToken,
+ {$match: {ns: {db: dbName}}},
+ {change_stream_match_pushdown_and_rewrite_and_rewrite: {dropDatabase: [dbName, dbName]}},
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+st.stop();
+})();
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
new file mode 100644
index 00000000000..61ce95e1e79
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js
@@ -0,0 +1,283 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the
+// 'operationType' and apply it to oplog-format documents in order to filter out results as early as
+// possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_and_rewrite";
+const collName = "coll1";
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+
+// Create a sharded collection.
+const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */);
+
+// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and
+// validates that,
+// 1. for each shard, the events are seen in that order as specified in 'expectedOps'
+// 2. each shard returns the expected number of events
+// 3. the filtering is been done at oplog level
+//
+// Note that invalidating events cannot be tested by this function, since these will cause the
+// explain used to verify the oplog-level rewrites to fail.
+function verifyNonInvalidatingOps(
+ resumeAfterToken, userMatchExpr, expectedOps, expectedOplogRetDocsForEachShard) {
+ const cursor =
+ coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]);
+
+ // For shard1, document id is '1' and for shard2, document id is '2'.
+ const docIds = [1, 2];
+
+ for (const op of expectedOps) {
+ docIds.forEach(docId => {
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+ assert.eq(event.operationType, op, event);
+ assert.eq(event.documentKey._id, docId, event);
+ });
+ }
+
+ assert(!cursor.hasNext());
+
+ // An 'executionStats' could only be captured for a non-invalidating stream.
+ const stats = coll.explain("executionStats")
+ .aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]);
+
+ assertNumChangeStreamDocsReturnedFromShard(stats, st.rs0.name, expectedOps.length);
+ assertNumChangeStreamDocsReturnedFromShard(stats, st.rs1.name, expectedOps.length);
+
+ assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogRetDocsForEachShard);
+ assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard);
+}
+
+// Open a change stream and store the resume token. This resume token will be used to replay the
+// stream after this point.
+const resumeAfterToken = coll.watch([]).getResumeToken();
+
+// These operations will create oplog events. The change stream will apply several filters on these
+// series of events and ensure that the '$match' expressions are rewritten correctly.
+assert.commandWorked(coll.insert({_id: 1}));
+assert.commandWorked(coll.insert({_id: 2}));
+assert.commandWorked(coll.update({_id: 1}, {$set: {foo: "bar"}}));
+assert.commandWorked(coll.update({_id: 2}, {$set: {foo: "bar"}}));
+assert.commandWorked(coll.replaceOne({_id: 1}, {_id: 1, foo: "baz"}));
+assert.commandWorked(coll.replaceOne({_id: 2}, {_id: 2, foo: "baz"}));
+assert.commandWorked(coll.deleteOne({_id: 1}));
+assert.commandWorked(coll.deleteOne({_id: 2}));
+
+// Ensure that the '$match' on the 'insert' operation type is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: "insert"}},
+ ["insert"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the 'update' operation type is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: "update"}},
+ ["update"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the 'replace' operation type is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: "replace"}},
+ ["replace"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the 'delete' operation type is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: "delete"}},
+ ["delete"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the operation type as number is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: 1}},
+ [] /* expectedOps */,
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on an unknown operation type cannot be rewritten to the oplog format.
+// The oplog cursor should return '4' documents.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: "unknown"}},
+ [] /* expectedOps */,
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on an empty string operation type cannot be rewritten to the oplog
+// format. The oplog cursor should return '4' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: ""}},
+ [] /* expectedOps */,
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on operation type with inequality operator cannot be rewritten to the
+// oplog format. The oplog cursor should return '4' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$gt: "insert"}}},
+ ["update", "replace"],
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on operation type sub-field can be rewritten to the oplog format. The
+// oplog cursor should return '0' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {"operationType.subField": "subOperation"}},
+ [] /* expectedOps */,
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the operation type with '$in' is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$in: ["insert", "update"]}}},
+ ["insert", "update"],
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that for the '$in' with one valid and one invalid operation type, rewrite to the oplog
+// format will be abandoned. The oplog cursor should return '4' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$in: ["insert", "unknown"]}}},
+ ["insert"],
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' with '$in' on an unknown operation type cannot be rewritten. The oplog
+// cursor should return '4' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$in: ["unknown"]}}},
+ [] /* expectedOps */,
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' with '$in' with operation type as number is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$in: [1]}}},
+ [] /* expectedOps */,
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' with '$in' with operation type as a string and a regex cannot be
+// rewritten. The oplog cursor should return '4' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$in: [/^insert$/, "update"]}}},
+ ["insert", "update"] /* expectedOps */,
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' on the operation type with '$nin' is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$nin: ["insert"]}}},
+ ["update", "replace", "delete"],
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that for the '$nin' with one valid and one invalid operation type, rewrite to the oplog
+// format will be abandoned. The oplog cursor should return '4' documents for each shard.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$nin: ["insert", "unknown"]}}},
+ ["update", "replace", "delete"],
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' with '$nin' with operation type as number is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {operationType: {$nin: [1]}}},
+ ["insert", "update", "replace", "delete"],
+ 4 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' to match only 'insert' operations is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$operationType", "insert"]}}},
+ ["insert"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' to match only 'update' operations is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$operationType", "update"]}}},
+ ["update"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' to match only 'replace' operations is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$operationType", "replace"]}}},
+ ["replace"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' to match only 'delete' operations is rewritten correctly.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$operationType", "delete"]}}},
+ ["delete"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' is rewritten correctly when comparing with 'unknown'
+// operation type.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {$match: {$expr: {$eq: ["$operationType", "unknown"]}}},
+ [] /* expectedOps */,
+ 0 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' is rewritten correctly when '$and' is in the expression.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $and: [
+ {$gte: [{$indexOfCP: ["$operationType", "l"]}, 0]},
+ {$gte: [{$indexOfCP: ["$operationType", "te"]}, 0]}
+ ]
+ }
+ }
+ },
+ ["delete"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' is rewritten correctly when '$or' is in the expression.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $or: [
+ {$gte: [{$indexOfCP: ["$operationType", "l"]}, 0]},
+ {$gte: [{$indexOfCP: ["$operationType", "te"]}, 0]}
+ ]
+ }
+ }
+ },
+ ["update", "replace", "delete"],
+ 3 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' is rewritten correctly when '$not' is in the expression.
+verifyNonInvalidatingOps(
+ resumeAfterToken,
+ {$match: {$expr: {$not: {$regexMatch: {input: "$operationType", regex: /e$/}}}}},
+ ["insert"],
+ 1 /* expectedOplogRetDocsForEachShard */);
+
+// Ensure that the '$match' using '$expr' is rewritten correctly when nor ({$not: {$or: [...]}}) is
+// in the expression.
+verifyNonInvalidatingOps(resumeAfterToken,
+ {
+ $match: {
+ $expr: {
+ $not: {
+ $or: [
+ {$eq: ["$operationType", "insert"]},
+ {$eq: ["$operationType", "delete"]},
+ ]
+ }
+ }
+ }
+ },
+ ["update", "replace"],
+ 2 /* expectedOplogRetDocsForEachShard */);
+
+st.stop();
+})();
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js
new file mode 100644
index 00000000000..d0254ce9145
--- /dev/null
+++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js
@@ -0,0 +1,442 @@
+// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the 'to' and
+// apply it to oplog-format documents in order to filter out results as early as possible.
+// @tags: [
+// featureFlagChangeStreamsRewrite,
+// requires_fcv_51,
+// requires_pipeline_optimization,
+// requires_sharding,
+// uses_change_streams,
+// change_stream_does_not_expect_txns,
+// assumes_unsharded_collection,
+// assumes_read_preference_unchanged
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers.
+
+const dbName = "change_stream_match_pushdown_and_rewrite";
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+
+// A helper that opens a change stream on the whole cluster with the user supplied match expression
+// 'userMatchExpr' and validates that:
+// 1. for each shard, the events are seen in that order as specified in 'expectedResult'
+// 2. the filtering is been done at oplog level
+function verifyOnWholeCluster(
+ resumeAfterToken, userMatchExpr, expectedResult, expectedOplogRetDocsForEachShard) {
+ const cursor = db.getSiblingDB("admin").aggregate([
+ {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}},
+ userMatchExpr
+ ]);
+
+ for (const [coll, opDict] of Object.entries(expectedResult)) {
+ for (const [op, eventIdentifierList] of Object.entries(opDict)) {
+ eventIdentifierList.forEach(eventIdentifier => {
+ assert.soon(() => cursor.hasNext());
+ const event = cursor.next();
+ assert.eq(event.operationType, op, event);
+
+ if (op == "insert") {
+ assert.eq(event.documentKey._id, eventIdentifier, event);
+ } else if (op == "rename") {
+ assert.eq(event.to.coll, eventIdentifier, event);
+ } else {
+ assert(false, event);
+ }
+ assert.eq(event.ns.coll, coll);
+ });
+ }
+ }
+
+ assert(!cursor.hasNext());
+
+ const stats = db.getSiblingDB("admin").runCommand({
+ explain: {
+ aggregate: 1,
+ pipeline: [
+ {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}},
+ userMatchExpr
+ ],
+ cursor: {batchSize: 0}
+ },
+ verbosity: "executionStats"
+ });
+
+ assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogRetDocsForEachShard);
+ assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard);
+}
+
+// Create some new collections to ensure that test cases has sufficient namespaces to verify that
+// the filtering on the 'to' field is working correctly.
+const coll1 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll1", 5 /* splitAt */);
+const coll2 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll2", 9 /* splitAt */);
+
+const ns1 = dbName + ".newColl1";
+const ns2 = dbName + ".newColl2";
+
+// Open a change stream and store the resume token. This resume token will be used to replay the
+// stream after this point.
+const resumeAfterToken =
+ db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken();
+
+// The inserted documents purposely contain field name 'o.to' to match with that of oplog's 'to'
+// field. These fields should not interfere with the rewritten predicate and only expected documents
+// should be returned at the oplog level.
+assert.commandWorked(coll1.insert({_id: 3, "o.to": 3}));
+assert.commandWorked(coll1.insert({_id: 4, "o.to": ""}));
+assert.commandWorked(coll1.insert({_id: 5, "o.to": ns2}));
+assert.commandWorked(coll1.insert({_id: 6, "o.to": dbName}));
+assert.commandWorked(coll1.renameCollection("newColl1"));
+assert.commandWorked(coll2.insert({_id: 7, "o.to": 3}));
+assert.commandWorked(coll2.insert({_id: 8, "o.to": ""}));
+assert.commandWorked(coll2.insert({_id: 9, "o.to": ns1}));
+assert.commandWorked(coll2.insert({_id: 10, "o.to": dbName}));
+assert.commandWorked(coll2.renameCollection("newColl2"));
+
+// This group of tests ensures that the '$match' on the 'to' object only sees its documents and only
+// required document(s) are returned at the oplog for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {to: {db: dbName, coll: "newColl1"}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {to: {db: dbName, coll: "newColl2"}}},
+ {coll2: {rename: ["newColl2", "newColl2"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on the 'to' object with only db component should not emit any document
+// and the oplog should not return any documents.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {to: {db: dbName}}}, {}, 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the 'to' object with 'unknown' collection does not exists and the oplog cursor
+// returns 0 document.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {to: {db: dbName, coll: "unknown"}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the 'to' object with flipped fields does not match and the oplog cursor returns 0
+// document.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {to: {coll: "newColl1", db: dbName}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the 'to' object with extra fields does not match and the oplog cursor returns 0
+// document.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {to: {db: dbName, coll: "newColl1", extra: "extra"}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the empty 'to' object does not match and the oplog cursor returns 0 document.
+verifyOnWholeCluster(resumeAfterToken, {$match: {to: {}}}, {}, 0);
+
+// Ensure the '$match' on the db field path should only return documents with rename op type for all
+// collections and oplog should also return same for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": dbName}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": /^change_stream_match_pushdown.*$/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": /^(change_stream_match_pushdown.*$)/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": /(^unknown$|^change_stream_match_pushdown.*$)/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": /^unknown$|^change_stream_match_pushdown.*$/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on non-existing db should not return any document and oplog should not
+// return any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"to.db": "unknown"}}, {}, 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on empty db should not return any document and oplog should not return
+// any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"to.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on sub field of db should not return any document and oplog should not
+// return any document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.db.extra": dbName}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// This group of tests ensures that the '$match' on collection field path should emit only the
+// required documents and oplog should return only required document(s) for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": "newColl1"}},
+ {coll1: {rename: ["newColl1", "newColl1"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": "newColl2"}},
+ {coll2: {rename: ["newColl2", "newColl2"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": /^newColl.*1/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": /^newColl.*2/}},
+ {coll2: {rename: ["newColl2", "newColl2"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+
+// This group of tests ensures that the '$match' on the regex matching all collections should return
+// documents with rename op type and oplog should also return same for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": /^newColl.*/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": /^newColl.*/i}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": /^newColl.*1$|^newColl.*2$/}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on the regex to exclude 'coll1' should return only documents from
+// 'coll2' and oplog should return required documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": /^newColl[^1]/}},
+ {coll2: {rename: ["newColl2", "newColl2"]}},
+ 1 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on non-existing collection should not return any document and oplog
+// should not return any document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": "unknown"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on empty collection should not return any document and oplog should not
+// return any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"to.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that the '$match' on sub field of collection should not return any document and oplog
+// should not return any document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll.extra": "coll1"}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' on db should return all documents with rename op type and oplog should also
+// return same each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": {$in: [dbName, "unknown"]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": {$in: [/^change_stream_match.*$/, /^unknown$/]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": {$in: [/^change_stream_MATCH.*$/i, /^unknown$/i]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that an empty '$in' on db path should not match any collection and oplog should not return
+// any document for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken, {$match: {"to.db": {$in: []}}}, {}, 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents for
+// each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": {$in: [dbName, 1]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 6 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should
+// return '0' document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.db": {$in: ["unknown1", /^unknown2$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' on multiple collections should return the required documents and oplog should
+// return required documents for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to": {$in: [{db: dbName, coll: "newColl1"}, {db: dbName, coll: "newColl2"}]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": {$in: ["newColl1", "newColl2"]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": {$in: [/^newColl1$/, /^newColl2$/]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+
+// This group of tests ensures that '$in' on regex of matching all collections should return all
+// documents with rename op type and oplog should also return same for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": {$in: [/^newColl.*$/, /^unknown.*/]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": {$in: [/^newColl.*$/i, /^unknown/i]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that an empty '$in' should not match any collection and oplog should not return any
+// document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$in: []}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' with invalid collection cannot be rewritten and oplog should return all
+// documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$in: ["newColl1", 1]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}},
+ 6 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' with mix of string and regex matching collections can be rewritten and oplog
+// should return required documents for each shard.
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.coll": {$in: ["newColl1", /^newColl.*2$/]}}},
+ {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}},
+ 2 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$in' with mix of string and regex can be rewritten and oplog should return '0'
+// document for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$in: ["unknown1", /^unknown2$/]}}},
+ {},
+ 0 /* expectedOplogRetDocsForEachShard*/);
+
+// This group of tests ensure that '$nin' on db path should return all documents and oplog should
+// return all documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.db": {$nin: []}}},
+ {
+ coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 6 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.db": {$nin: ["unknown1", "unknown2"]}}},
+ {
+ coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 6 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.db": {$nin: [/^unknown1$/, /^unknown2$/]}}},
+ {
+ coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 6 /* expectedOplogRetDocsForEachShard*/);
+
+// These group of tests ensure that '$nin' on matching db name should not return any documents with
+// rename op type and oplog should also return same each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.db": {$nin: [dbName, "unknown"]}}},
+ {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}},
+ 4 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(
+ resumeAfterToken,
+ {$match: {"to.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/, /^unknown.*/]}}},
+ {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}},
+ 4 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$nin' on a collection should return the documents with only insert op type for that
+// collection and oplog should also return same for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$nin: ["newColl1", "unknown"]}}},
+ {
+ coll1: {insert: [3, 4, 5, 6]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 5 /* expectedOplogRetDocsForEachShard*/);
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$nin: [/^newColl1$/, /^unknown$/]}}},
+ {
+ coll1: {insert: [3, 4, 5, 6]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 5 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$nin' on regex of matching all collections should only return documents with op type
+// insert and oplog should also return same for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$nin: [/^newColl.*$/, /^unknown.*$/]}}},
+ {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}},
+ 4 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that an empty '$nin' should match all collections and oplog should return all documents
+// for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$nin: []}}},
+ {
+ coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 6 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$nin' with invalid collection cannot be rewritten and oplog should return all
+// documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$nin: ["newColl1", 1]}}},
+ {
+ coll1: {insert: [3, 4, 5, 6]},
+ coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]}
+ },
+ 6 /* expectedOplogRetDocsForEachShard*/);
+
+// Ensure that '$nin' with mix of string and regex can be rewritten and oplog should return required
+// documents for each shard.
+verifyOnWholeCluster(resumeAfterToken,
+ {$match: {"to.coll": {$nin: ["newColl1", /^newColl2$/]}}},
+ {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}},
+ 4 /* expectedOplogRetDocsForEachShard*/);
+
+st.stop();
+})();