diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2021-09-16 11:44:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-16 13:53:02 +0000 |
commit | aa36306977363038be233195ac5645edb3d71dab (patch) | |
tree | 59f8a1e7c258f44e0985bcea48ff4dac7cfe6d4c /jstests/change_streams | |
parent | 9caae4e9c3cc4c0dc27967ad9e0f84284087a8de (diff) | |
download | mongo-aa36306977363038be233195ac5645edb3d71dab.tar.gz |
SERVER-59457 [Part2] Allow change stream rewrite tests to run in whole-db and whole-cluster passthrough
Diffstat (limited to 'jstests/change_streams')
6 files changed, 1915 insertions, 0 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js new file mode 100644 index 00000000000..76c4ed2d1f2 --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_basic_match_pushdown_rewrite.js @@ -0,0 +1,179 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the $match and +// apply it to oplog-format documents in order to filter out results as early as possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_and_rewrite"; +const collName = "coll1"; +const collNameAlternate = "change_stream_match_pushdown_and_rewrite_alternate"; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); + +// Create a sharded collection. +const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */); + +// Create a second (unsharded) test collection for validating transactions that insert into multiple +// collections. +assert.commandWorked(db.createCollection(collNameAlternate)); + +const changeStream = coll.aggregate([{$changeStream: {}}, {$match: {operationType: "insert"}}]); + +// Store a resume token that can be used to start the change stream from the beginning. +const resumeAfterToken = changeStream.getResumeToken(); + +// These commands will result in 6 oplog events, with all but 2 will be filtered out by the +// $match. +assert.commandWorked(coll.insert({_id: 1, string: "Value"})); +assert.commandWorked(coll.update({_id: 1}, {$set: {foo: "bar"}})); +assert.commandWorked(coll.remove({_id: 1})); +assert.commandWorked(coll.insert({_id: 2, string: "vAlue"})); +assert.commandWorked(coll.update({_id: 2}, {$set: {foo: "bar"}})); +assert.commandWorked(coll.remove({_id: 2})); + +// Verify correct operation of the change stream. +assert.soon(() => changeStream.hasNext()); +const event1 = changeStream.next(); +assert.eq(event1.operationType, "insert", event1); +assert.eq(event1.documentKey._id, 1, event1); + +assert.soon(() => changeStream.hasNext()); +const event2 = changeStream.next(); +assert.eq(event2.operationType, "insert", event2); +assert.eq(event2.documentKey._id, 2, event2); + +assert(!changeStream.hasNext()); +changeStream.close(); + +// Run the same change stream again, this time with "executionStats" to get more detailed +// information about how many documents are processed by each stage. Note that 'event1' will not +// be included in the results set this time, because we are using it as the resume point, but +// will still be returned from the shard and get processed on the mongoS. +const stats = coll.explain("executionStats").aggregate([ + {$changeStream: {resumeAfter: event1._id}}, + {$match: {operationType: "insert"}} +]); + +// Verify the number of documents seen from each shard by the mongoS pipeline. Because we expect +// the $match to be pushed down to the shards, we expect to only see the 1 "insert" operation on +// each shard. All other operations should be filtered out on the shards. +assertNumChangeStreamDocsReturnedFromShard(stats, st.rs0.name, 1); +assertNumChangeStreamDocsReturnedFromShard(stats, st.rs1.name, 1); + +// Because it is possible to rewrite the {operationType: "insert"} predicate so that it applies +// to the oplog entry, we expect the $match to get pushed all the way to the initial oplog +// query. This query executes in an internal "$cursor" stage, and we expect to see exactly 1 +// document from this stage on each shard. +assertNumMatchingOplogEventsForShard(stats, st.rs0.name, 1); +assertNumMatchingOplogEventsForShard(stats, st.rs1.name, 1); + +// Generate another 7 oplog events, this time within a transaction. One of the events is in a +// different collection, to validate that events from outside the watched namespace get filtered +// out even when within a transaction. +const session = st.s.startSession({causalConsistency: true}); +const sessionColl = session.getDatabase(dbName)[collName]; +const sessionCollAlternate = session.getDatabase(dbName)[collNameAlternate]; + +session.startTransaction({readConcern: {level: "majority"}}); + +assert.commandWorked(sessionColl.insert({_id: 1})); +assert.commandWorked(sessionColl.update({_id: 1}, {$set: {foo: "bar"}})); +assert.commandWorked(sessionColl.remove({_id: 1})); +assert.commandWorked(sessionCollAlternate.insert({_id: "alt"})); +assert.commandWorked(sessionColl.insert({_id: 2})); +assert.commandWorked(sessionColl.update({_id: 2}, {$set: {foo: "bar"}})); +assert.commandWorked(sessionColl.remove({_id: 2})); + +assert.commandWorked(session.commitTransaction_forTesting()); + +// Repeat the change stream from before, using a resume token to pick up from where the previous +// change stream left off. This change stream will only observe the 6 operations that occur in the +// transaction and will filter out everything except the 2 inserts. +const txnChangeStream = coll.aggregate( + [{$changeStream: {resumeAfter: event2._id}}, {$match: {operationType: "insert"}}]); + +// Verify correct operation of the change stream. +assert.soon(() => txnChangeStream.hasNext()); +const event3 = txnChangeStream.next(); +assert.eq(event3.operationType, "insert", event3); +assert.eq(event3.documentKey._id, 1, event3); + +assert.soon(() => txnChangeStream.hasNext()); +const event4 = txnChangeStream.next(); +assert.eq(event4.operationType, "insert", event4); +assert.eq(event4.documentKey._id, 2, event4); + +assert(!txnChangeStream.hasNext()); +txnChangeStream.close(); + +// Run explain on the change stream to get more detailed execution information. +const txnStatsAfterEvent2 = coll.explain("executionStats").aggregate([ + {$changeStream: {resumeAfter: event2._id}}, + {$match: {operationType: "insert"}} +]); + +// Verify the number of documents seen from each shard by the mongoS pipeline. As before, we expect +// that everything except the inserts will be filtered on the shard, limiting the number of events +// the mongoS needs to retrieve. +assertNumChangeStreamDocsReturnedFromShard(txnStatsAfterEvent2, st.rs0.name, 1); + +// Note that the event we are resuming from is sent to the mongoS from shard 2, even though it gets +// filtered out, which is why we see 2 events here. +assertNumChangeStreamDocsReturnedFromShard(txnStatsAfterEvent2, st.rs1.name, 2); + +// Generate a second transaction. +session.startTransaction({readConcern: {level: "majority"}}); + +assert.commandWorked(sessionColl.insert({_id: 1, a: 1, string: "vaLue"})); +assert.commandWorked(sessionColl.update({_id: 1}, {$set: {foo: "bar"}})); +assert.commandWorked(sessionColl.remove({_id: 1})); +assert.commandWorked(sessionColl.insert({_id: 2, a: 2, string: "valUe"})); +assert.commandWorked(sessionColl.update({_id: 2}, {$set: {foo: "bar"}})); +assert.commandWorked(sessionColl.remove({_id: 2})); + +assert.commandWorked(session.commitTransaction_forTesting()); + +// This change stream targets transactions from this session but filters out the first transaction. +const txnStatsAfterEvent1 = coll.explain("executionStats").aggregate([ + {$changeStream: {resumeAfter: event1._id}}, + {$match: {operationType: "insert", lsid: event3.lsid, txnNumber: {$ne: event3.txnNumber}}} +]); + +// The "lsid" and "txnNumber" filters should get pushed all the way to the initial oplog query +// in the $cursor stage, meaning that every oplog entry gets filtered out except the +// 'commitTransaction' on each shard for the one transaction we select with our filter. +assertNumMatchingOplogEventsForShard(txnStatsAfterEvent1, st.rs0.name, 1); +assertNumMatchingOplogEventsForShard(txnStatsAfterEvent1, st.rs1.name, 1); + +// Ensure that optimization does not attempt to create a filter that disregards the collation. +const collationChangeStream = coll.aggregate( + [{$changeStream: {resumeAfter: resumeAfterToken}}, {$match: {"fullDocument.string": "value"}}], + {collation: {locale: "en_US", strength: 2}}); + +["Value", "vAlue", "vaLue", "valUe"].forEach(val => { + assert.soon(() => collationChangeStream.hasNext()); + const fullDocumentEvent = collationChangeStream.next(); + assert.eq(fullDocumentEvent.fullDocument.string, val, fullDocumentEvent); +}); + +assert(!collationChangeStream.hasNext()); +collationChangeStream.close(); +st.stop(); +})(); diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js new file mode 100644 index 00000000000..add6cd1b1bc --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_documentKey_rewrite.js @@ -0,0 +1,199 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: <predicate>}] with a predicate +// involving the 'documentKey' field can push down the $match and rewrite the $match and make it +// part of the oplog cursor's filter in order to filter out results as early as possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_documentKey_rewrite"; +const collName = "change_stream_match_pushdown_documentKey_rewrite"; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); + +// Returns a newly created sharded collection, where shard key is 'shard'. +const coll = createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */); + +// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and +// validates that: +// (1) for each shard, the events are seen in that order as specified in 'expectedOps'; and +// (2) each shard returns the expected number of events; and +// (3) the number of docs returned by the oplog cursor on each shard matches what we expect +// as specified in 'expectedOplogCursorReturnedDocs'. +function verifyOps(resumeAfterToken, userMatchExpr, expectedOps, expectedOplogCursorReturnedDocs) { + const cursor = + coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]); + + let expectedChangeStreamDocsReturned = [0, 0]; + for (const [op, id, shardId] of expectedOps) { + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + assert.eq(event.operationType, op, event); + assert.eq(event.documentKey._id, id, event); + assert.eq(event.documentKey.shard, shardId, event); + if (shardId == 0 || shardId == 1) { + ++expectedChangeStreamDocsReturned[shardId]; + } + } + + assert(!cursor.hasNext()); + + // An 'executionStats' could only be captured for a non-invalidating stream. + const stats = coll.explain("executionStats") + .aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]); + + assertNumChangeStreamDocsReturnedFromShard( + stats, st.rs0.name, expectedChangeStreamDocsReturned[0]); + assertNumChangeStreamDocsReturnedFromShard( + stats, st.rs1.name, expectedChangeStreamDocsReturned[1]); + assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogCursorReturnedDocs[0]); + assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogCursorReturnedDocs[1]); +} + +// Open a change stream and store the resume token. This resume token will be used to replay the +// stream after this point. +const resumeAfterToken = coll.watch([]).getResumeToken(); + +// These operations will create oplog events. The change stream will apply several filters on these +// series of events and ensure that the '$match' expressions are rewritten correctly. +assert.commandWorked(coll.insert({_id: 2, shard: 0})); +assert.commandWorked(coll.insert({_id: 3, shard: 0, z: 4})); +assert.commandWorked(coll.insert({_id: 2, shard: 1, z: 4})); +assert.commandWorked(coll.insert({_id: 3, shard: 1})); +assert.commandWorked(coll.update({_id: 2, shard: 0}, {$set: {foo: "a"}})); +assert.commandWorked(coll.update({_id: 3, shard: 0}, {$set: {foo: "a"}})); +assert.commandWorked(coll.update({_id: 2, shard: 1}, {$set: {foo: "a"}})); +assert.commandWorked(coll.update({_id: 3, shard: 1}, {$set: {foo: "a"}})); +assert.commandWorked(coll.replaceOne({_id: 2, shard: 0}, {_id: 2, shard: 0, foo: "b"})); +assert.commandWorked(coll.replaceOne({_id: 3, shard: 0}, {_id: 3, shard: 0, z: 4, foo: "b"})); +assert.commandWorked(coll.replaceOne({_id: 2, shard: 1}, {_id: 2, shard: 1, z: 4, foo: "b"})); +assert.commandWorked(coll.replaceOne({_id: 3, shard: 1}, {_id: 3, shard: 1, foo: "b"})); +assert.commandWorked(coll.deleteOne({_id: 2, shard: 0})); +assert.commandWorked(coll.deleteOne({_id: 3, shard: 0})); +assert.commandWorked(coll.deleteOne({_id: 2, shard: 1})); +assert.commandWorked(coll.deleteOne({_id: 3, shard: 1})); + +// Enable a failpoint that will prevent $expr match expressions from generating $_internalExprEq +// or similar expressions. This ensures that the following test-cases only exercise the $expr +// rewrites. +assert.commandWorked( + db.adminCommand({configureFailPoint: "disableMatchExpressionOptimization", mode: "alwaysOn"})); + +// Ensure that the '$match' on the 'insert', 'update', 'replace', and 'delete' operation types with +// various predicates are rewritten correctly. +for (const op of ["insert", "update", "replace", "delete"]) { + // Test out a predicate on the full 'documentKey' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, documentKey: {shard: 0, _id: 2}}}, + [[op, 2, 0]], + [1, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out a predicate on 'documentKey._id'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey._id": 2}}, + [[op, 2, 0], [op, 2, 1]], + [1, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out a predicate on 'documentKey.shard'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey.shard": 1}}, + [[op, 2, 1], [op, 3, 1]], + [0, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on the full 'documentKey' field. It's not possible to rewrite + // this predicate and make it part of the oplog filter, so we expect the oplog cursor to return + // 2 docs on each shard. + verifyOps(resumeAfterToken, + {$match: {operationType: op, documentKey: {$not: {$eq: {shard: 0, _id: 2}}}}}, + [[op, 3, 0], [op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on 'documentKey._id'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey._id": {$not: {$eq: 2}}}}, + [[op, 3, 0], [op, 3, 1]], + [1, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on 'documentKey.shard'. It's not possible to rewrite this + // predicate and make it part of the oplog filter, so we expect the oplog cursor to return 2 + // docs on each shard. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey.shard": {$not: {$eq: 1}}}}, + [[op, 2, 0], [op, 3, 0]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out the '{$exists: false}' predicate on a field that doesn't exist in 'documentKey' but + // that does exist in some of the underlying documents. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "documentKey.z": {$exists: false}}}, + [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out an $expr predicate on the full 'documentKey' field. + verifyOps( + resumeAfterToken, + { + $match: + {$and: [{operationType: op}, {$expr: {$eq: ["$documentKey", {shard: 0, _id: 2}]}}]} + }, + [[op, 2, 0]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on the full 'documentKey' field. + verifyOps(resumeAfterToken, + { + $match: { + $and: [ + {operationType: op}, + {$expr: {$not: {$eq: ["$documentKey", {shard: 0, _id: 2}]}}} + ] + } + }, + [[op, 3, 0], [op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out an $expr predicate on 'documentKey._id'. + verifyOps(resumeAfterToken, + {$match: {$and: [{operationType: op}, {$expr: {$eq: ["$documentKey._id", 2]}}]}}, + [[op, 2, 0], [op, 2, 1]], + [1, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $expr predicate on 'documentKey._id'. + verifyOps( + resumeAfterToken, + {$match: {$and: [{operationType: op}, {$expr: {$not: {$eq: ["$documentKey._id", 2]}}}]}}, + [[op, 3, 0], [op, 3, 1]], + [1, 1] /* expectedOplogCursorReturnedDocs */); + + // Test out an $expr predicate on 'documentKey.shard'. + verifyOps(resumeAfterToken, + {$match: {$and: [{operationType: op}, {$expr: {$eq: ["$documentKey.shard", 1]}}]}}, + [[op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated $expr predicate on 'documentKey.shard'. + verifyOps( + resumeAfterToken, + {$match: {$and: [{operationType: op}, {$expr: {$not: {$eq: ["$documentKey.shard", 1]}}}]}}, + [[op, 2, 0], [op, 3, 0]], + [2, 2] /* expectedOplogCursorReturnedDocs */); +} + +st.stop(); +})(); diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js new file mode 100644 index 00000000000..35986a01d95 --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_fullDocument_rewrite.js @@ -0,0 +1,211 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: <predicate>}] with a predicate +// involving the 'fullDocument' field can push down the $match and rewrite the $match and make it +// part of the oplog cursor's filter in order to filter out results as early as possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_fullDocument_rewrite"; +const collName = "change_stream_match_pushdown_fullDocument_rewrite"; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +// Create a sharded collection where shard key is 'shard'. +const coll = createShardedCollection(st, "shard" /* shardKey */, dbName, collName, 1 /* splitAt */); + +// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and +// validates that: +// (1) for each shard, the events are seen in that order as specified in 'expectedOps'; and +// (2) the number of docs returned by each shard matches what we expect as specified by +// 'expectedChangeStreamDocsReturned'; and +// (3) the number of docs returned by the oplog cursor on each shard matches what we expect as +// specified in 'expectedOplogCursorReturnedDocs'. +function verifyOps(resumeAfterToken, + userMatchExpr, + expectedOps, + expectedChangeStreamDocsReturned, + expectedOplogCursorReturnedDocs) { + const cursor = coll.aggregate([ + {$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}}, + userMatchExpr + ]); + + for (const [op, id, shardId] of expectedOps) { + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + assert.eq(event.operationType, op, event); + if (id !== undefined) { + assert.eq(event.fullDocument._id, id, event); + } + if (shardId !== undefined) { + assert.eq(event.fullDocument.shard, shardId, event); + } + } + + assert(!cursor.hasNext()); + + // An 'executionStats' could only be captured for a non-invalidating stream. + const stats = coll.explain("executionStats").aggregate([ + {$changeStream: {resumeAfter: resumeAfterToken, fullDocument: "updateLookup"}}, + userMatchExpr + ]); + + assertNumChangeStreamDocsReturnedFromShard( + stats, st.rs0.name, expectedChangeStreamDocsReturned[0]); + assertNumChangeStreamDocsReturnedFromShard( + stats, st.rs1.name, expectedChangeStreamDocsReturned[1]); + assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogCursorReturnedDocs[0]); + assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogCursorReturnedDocs[1]); +} + +// Open a change stream and store the resume token. This resume token will be used to replay the +// stream after this point. +const resumeAfterToken = coll.watch([]).getResumeToken(); + +// These operations will create oplog events. The change stream will apply several filters on these +// series of events and ensure that the '$match' expressions are rewritten correctly. +assert.commandWorked(coll.insert({_id: 2, shard: 0})); +assert.commandWorked(coll.insert({_id: 3, shard: 0})); +assert.commandWorked(coll.insert({_id: 2, shard: 1})); +assert.commandWorked(coll.insert({_id: 3, shard: 1})); +assert.commandWorked(coll.replaceOne({_id: 2, shard: 0}, {_id: 2, shard: 0, foo: "a"})); +assert.commandWorked(coll.replaceOne({_id: 3, shard: 0}, {_id: 3, shard: 0, foo: "a"})); +assert.commandWorked(coll.replaceOne({_id: 2, shard: 1}, {_id: 2, shard: 1, foo: "a"})); +assert.commandWorked(coll.replaceOne({_id: 3, shard: 1}, {_id: 3, shard: 1, foo: "a"})); +assert.commandWorked(coll.update({_id: 2, shard: 0}, {$set: {foo: "b"}})); +assert.commandWorked(coll.update({_id: 3, shard: 0}, {$set: {foo: "b"}})); +assert.commandWorked(coll.update({_id: 2, shard: 1}, {$set: {foo: "b"}})); +assert.commandWorked(coll.update({_id: 3, shard: 1}, {$set: {foo: "b"}})); + +// This helper will execute 'delete' operations to create oplog events. We defer executing the +// 'delete' operations so that the 'update' operations may look up the relevant document in the +// collection. +const runDeleteOps = () => { + assert.commandWorked(coll.deleteOne({_id: 2, shard: 0})); + assert.commandWorked(coll.deleteOne({_id: 3, shard: 0})); + assert.commandWorked(coll.deleteOne({_id: 2, shard: 1})); + assert.commandWorked(coll.deleteOne({_id: 3, shard: 1})); +}; + +// This helper takes an operation 'op' and calls verifyOps() multiple times with 'op' to exercise +// several different testcases. +const runVerifyOpsTestcases = (op) => { + // 'delete' operations don't have a 'fullDocument' field, so we handle them as a special case. + if (op == "delete") { + // Test out the '{$exists: true}' predicate on the full 'fullDocument' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, fullDocument: {$exists: true}}}, + [], + [0, 0] /* expectedChangeStreamDocsReturned */, + [0, 0] /* expectedOplogCursorReturnedDocs */); + + // Test out the '{$exists: false}' predicate on the full 'fullDocument' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, fullDocument: {$exists: false}}}, + [[op], [op], [op], [op]], + [2, 2] /* expectedChangeStreamDocsReturned */, + [2, 2] /* expectedOplogCursorReturnedDocs */); + + return; + } + + // Initialize 'doc' so that it matches the 'fullDocument' field of one of the events where + // operationType == 'op'. For all 'insert' events, 'fullDocument' only has the '_id' field and + // the 'shard' field. For 'replace' and 'update' events, 'fullDocument' also has a 'foo' field. + const doc = {_id: 2, shard: 0}; + if (op != "insert") { + doc.foo = (op == "replace" ? "a" : "b"); + } + + // Note: for operations of type 'update', the 'fullDocument' field is not populated until midway + // through the pipeline. We therefore cannot rewrite predicates on 'fullDocument' into the oplog + // for this operation type. As a result, the tests below verify that the number of documents + // returned by the oplog scan are different for updates than for other event types. + + // Test out a predicate on the full 'fullDocument' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, fullDocument: doc}}, + [[op, 2, 0]], + [1, 0] /* expectedChangeStreamDocsReturned */, + op != "update" ? [1, 0] : [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a predicate on 'fullDocument._id'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "fullDocument._id": {$lt: 3}}}, + [[op, 2, 0], [op, 2, 1]], + [1, 1] /* expectedChangeStreamDocsReturned */, + op != "update" ? [1, 1] : [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a predicate on 'fullDocument.shard'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "fullDocument.shard": {$gt: 0}}}, + [[op, 2, 1], [op, 3, 1]], + [0, 2] /* expectedChangeStreamDocsReturned */, + op != "update" ? [0, 2] : [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on the full 'fullDocument' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, fullDocument: {$not: {$eq: doc}}}}, + [[op, 3, 0], [op, 2, 1], [op, 3, 1]], + [1, 2] /* expectedChangeStreamDocsReturned */, + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on 'fullDocument._id'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "fullDocument._id": {$not: {$lt: 3}}}}, + [[op, 3, 0], [op, 3, 1]], + [1, 1] /* expectedChangeStreamDocsReturned */, + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out a negated predicate on 'fullDocument.shard'. + verifyOps(resumeAfterToken, + {$match: {operationType: op, "fullDocument.shard": {$not: {$gt: 0}}}}, + [[op, 2, 0], [op, 3, 0]], + [2, 0] /* expectedChangeStreamDocsReturned */, + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out the '{$exists: true}' predicate on the full 'fullDocument' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, fullDocument: {$exists: true}}}, + [[op, 2, 0], [op, 3, 0], [op, 2, 1], [op, 3, 1]], + [2, 2] /* expectedChangeStreamDocsReturned */, + [2, 2] /* expectedOplogCursorReturnedDocs */); + + // Test out the '{$exists: false}' predicate on the full 'fullDocument' field. + verifyOps(resumeAfterToken, + {$match: {operationType: op, fullDocument: {$exists: false}}}, + [], + [0, 0] /* expectedChangeStreamDocsReturned */, + [2, 2] /* expectedOplogCursorReturnedDocs */); +}; + +// Verify '$match's on the 'update' operation type with various predicates get rewritten correctly. +// We intentionally do this before executing the 'delete' operations so that post-image lookup will +// be successful. +runVerifyOpsTestcases("update"); + +// Now that we're done verifying 'update' events, we can execute the 'delete' operations. +runDeleteOps(); + +// Ensure that '$match' on 'insert', 'replace', and 'delete' operation types with various predicates +// are rewritten correctly. +runVerifyOpsTestcases("insert"); +runVerifyOpsTestcases("replace"); +runVerifyOpsTestcases("delete"); + +st.stop(); +})(); diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js new file mode 100644 index 00000000000..4972f1fa90a --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_namespace_rewrite.js @@ -0,0 +1,601 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the 'namespace' +// and apply it to oplog-format documents in order to filter out results as early as possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_and_rewrite"; +const collName = "coll1"; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); + +// Create a sharded collection. +const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */); + +// A helper that opens a change stream on the whole cluster with the user supplied match expression +// 'userMatchExpr' and validates that: +// 1. for each shard, the events are seen in that order as specified in 'expectedResult' +// 2. the filtering is been done at oplog level +function verifyOnWholeCluster( + resumeAfterToken, userMatchExpr, expectedResult, expectedOplogRetDocsForEachShard) { + const cursor = db.getSiblingDB("admin").aggregate([ + {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}}, + userMatchExpr + ]); + + for (const [collOrDb, opDict] of Object.entries(expectedResult)) { + for (const [op, eventIdentifierList] of Object.entries(opDict)) { + eventIdentifierList.forEach(eventIdentifier => { + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + assert.eq(event.operationType, op, event); + + if (op == "dropDatabase") { + assert.eq(event.ns.db, eventIdentifier, event); + } else if (op == "insert") { + assert.eq(event.documentKey._id, eventIdentifier, event); + } else if (op == "rename") { + assert.eq(event.to.coll, eventIdentifier, event); + } else if (op == "drop") { + assert.eq(event.ns.coll, eventIdentifier); + } else { + assert(false, event); + } + + if (op != "dropDatabase") { + assert.eq(event.ns.coll, collOrDb); + } + }); + } + } + + assert(!cursor.hasNext()); + + const stats = db.getSiblingDB("admin").runCommand({ + explain: { + aggregate: 1, + pipeline: [ + {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}}, + userMatchExpr + ], + cursor: {batchSize: 0} + }, + verbosity: "executionStats" + }); + + assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogRetDocsForEachShard); + assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard); +} + +// Create some new collections to ensure that test cases has sufficient namespaces to verify +// that the namespace filtering is working correctly. +const coll2 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll2", 4 /* splitAt */); +const coll3 = + createShardedCollection(st, "_id" /* shardKey */, dbName, "coll.coll3", 6 /* splitAt */); +const coll4 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll4", 10 /* splitAt */); + +// Open a change stream and store the resume token. This resume token will be used to replay the +// stream after this point. +const resumeAfterToken = + db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken(); + +// For each collection, insert 2 documents, one on each shard. These will create oplog events and +// change stream will apply various namespace filtering on these collections to verify that the +// namespace is rewritten correctly. Each documents also contains field names matching with that of +// '$cmd' operations, ie. 'renameCollection', 'drop' and 'dropDatabase', but with value-type other +// than strings. The 'ns' match filters should gracefully handle the type mismatch and not throw any +// error. +// +// Each of these inserted documents will be represented in this form in the oplog: +// {... "o": {"_id": <id>, "renameCollection": true, "drop": {}, "dropDatabase": null}, ...} +// +// A few collections are renamed and dropped to verify that these are filtered properly. +assert.commandWorked(coll.insert({_id: 1, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked(coll.insert({_id: 2, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked(coll2.insert({_id: 3, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked(coll2.insert({_id: 4, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked(coll2.renameCollection("newColl2")); +assert.commandWorked(coll3.insert({_id: 5, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked(coll3.insert({_id: 6, renameCollection: true, drop: {}, dropDatabase: null})); +assert(coll3.drop()); + +// Insert some documents into 'coll4' with field names which match known command types. Despite the +// fact that these documents could potentially match with the partial 'ns' filter we rewrite into +// the oplog, the {op: "c"} predicate we add into the filter should ensure that they are correctly +// discarded. +assert.commandWorked(coll4.insert( + {_id: 7, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1})); +assert.commandWorked(coll4.insert({_id: 8, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked( + coll4.insert({_id: 9, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""})); +assert.commandWorked(coll4.insert( + {_id: 10, renameCollection: coll2.getName(), drop: coll3.getName(), dropDatabase: 1})); +assert.commandWorked(coll4.insert({_id: 11, renameCollection: true, drop: {}, dropDatabase: null})); +assert.commandWorked( + coll4.insert({_id: 12, renameCollection: "no_dot_ns", drop: "", dropDatabase: ""})); + +// This group of tests ensures that the '$match' on a particular namespace object only sees its +// documents and only required document(s) are returned at the oplog for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll1"}}}, + {coll1: {insert: [1, 2]}}, + 1 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll2"}}}, + {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll.coll3"}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the namespace with only db component should not emit any document and +// the oplog should not return any documents. +verifyOnWholeCluster( + resumeAfterToken, {$match: {ns: {db: dbName}}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the namespace object with 'unknown' collection does not exists and the oplog cursor +// returns 0 document. +verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "unknown"}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the namespace object with flipped fields does not match with the namespace object and +// the oplog cursor returns 0 document. +verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {coll: "coll1", db: dbName}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the namespace object with extra fields does not match with the namespace object and +// the oplog cursor returns 0 document. +verifyOnWholeCluster(resumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll1", extra: "extra"}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the empty namespace object does not match with the namespace object and the oplog +// cursor returns 0 document. +verifyOnWholeCluster(resumeAfterToken, {$match: {ns: {}}}, {}, 0); + +// Ensure the '$match' on namespace's db should return documents for all collection and oplog should +// return all documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": dbName}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// These cases ensure that the '$match' on regex of namespace' db, should return documents for all +// collection and oplog should return all documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^change_stream_match_pushdown.*$/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^(change_stream_match_pushdown.*$)/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /(^unknown$|^change_stream_match_pushdown.*$)/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": /^unknown$|^change_stream_match_pushdown.*$/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on non-existing db should not return any document and oplog should not +// return any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"ns.db": "unknown"}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on empty db should not return any document and oplog should not return +// any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"ns.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on sub field of db should not return any document and oplog should not +// return any document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db.extra": dbName}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// This group of tests ensures that the '$match' on collection field path should emit only the +// required documents and oplog should return only required document(s) for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "coll1"}}, + {coll1: {insert: [1, 2]}}, + 1 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "coll2"}}, + {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "coll.coll3"}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// This group of tests ensures that the '$match' on the regex of the collection field path should +// emit only the required documents and oplog should return only required document(s) for each +// shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*1/}}, + {coll1: {insert: [1, 2]}}, + 1 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*2/}}, + {coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*3/}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// This group of tests ensures that the '$match' on the regex matching all collections should return +// documents from all collection and oplog should return all document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^CoLL.*/i}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the regex matching 3 collection should return documents from these +// collections and oplog should return required documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^col.*1$|^col.*2$|^col.*3$/}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]} + }, + 5 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the regex to exclude 'coll1', 'coll2' and 'coll4' should return only +// documents from 'coll.coll3' and oplog should return required documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": /^coll[^124]/}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on non-existing collection should not return any document and oplog +// should not return any document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": "unknown"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on empty collection should not return any document and oplog should not +// return any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"ns.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on sub field of collection should not return any document and oplog +// should not return any document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll.extra": "coll1"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' on db should return all documents and oplog should return all documents for +// each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [dbName]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// This group of tests ensures that '$in' on regex matching the db name should return all documents +// and oplog should return all documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [/^change_stream_match.*$/]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [/^change_stream_MATCH.*$/i]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that an empty '$in' on db path should not match any collection and oplog should not return +// any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"ns.db": {$in: []}}}, {}, 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents for +// each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: [dbName, 1]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should +// return '0' document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$in: ["unknown1", /^unknown2$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' on multiple collections should return the required documents and oplog should +// return required documents for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"ns": {$in: [{db: dbName, coll: "coll1"}, {db: dbName, coll: "coll2"}]}}}, + {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 3 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"ns.coll": {$in: ["coll1", "coll2"]}}}, + {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 3 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' on regex of multiple collections should return the required documents and oplog +// should return required documents for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"ns.coll": {$in: [/^coll1$/, /^coll2$/]}}}, + {coll1: {insert: [1, 2]}, coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}}, + 3 /* expectedOplogRetDocsForEachShard */); + +// This group of tests ensures that '$in' on regex of matching all collections should return all +// documents and oplog should return all documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: [/^coll.*$/]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: [/^COLL.*$/i]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that an empty '$in' should not match any collection and oplog should not return any +// document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: []}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' with invalid collection cannot be rewritten and oplog should return all +// documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: ["coll1", 1]}}}, + {coll1: {insert: [1, 2]}}, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' with mix of string and regex matching collections can be rewritten and oplog +// should return required documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: ["coll1", /^coll.*3$/]}}}, + { + coll1: {insert: [1, 2]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + }, + 3 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$in' with mix of string and regex can be rewritten and oplog should return '0' +// document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$in: ["unknown1", /^unknown2$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// This group of tests ensure that '$nin' on db path should return all documents and oplog should +// return all documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: []}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: ["unknown"]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: [/^unknown$/]}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// These group of tests ensure that '$nin' on matching db name should not return any documents and +// oplog should return '0' documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: [dbName]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$nin' on multiple collections should return the required documents and oplog should +// return required documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: ["coll1", "coll2", "coll4"]}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$nin' on regex of multiple collections should return the required documents and +// oplog should return required documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: [/^coll1$/, /^coll2$/, /^coll4$/]}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$nin' on regex of matching all collections should not return any document and oplog +// should return '0' documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: [/^coll.*$/, /^sys.*$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that an empty '$nin' should match all collections and oplog should return all documents +// for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: []}}}, + { + coll1: {insert: [1, 2]}, + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + "coll4": {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$nin' with invalid collection cannot be rewritten and oplog should return all +// documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: ["coll1", 1]}}}, + { + coll2: {insert: [3, 4], rename: ["newColl2", "newColl2"]}, + "coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}, + coll4: {insert: [7, 8, 9, 10, 11, 12]} + }, + 8 /* expectedOplogRetDocsForEachShard */); + +// Ensure that '$nin' with mix of string and regex can be rewritten and oplog should return required +// documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"ns.coll": {$nin: ["coll1", /^coll2$/, "coll4"]}}}, + {"coll.coll3": {insert: [5, 6], drop: ["coll.coll3", "coll.coll3"]}}, + 2 /* expectedOplogRetDocsForEachShard */); + +// At this stage, the coll2 has been renamed to 'newColl2' and coll3 has been dropped. The test from +// here will drop the database and ensure that the 'ns' filter when applied over the collection +// should only emit the 'drop' event for that collection and not the 'dropDatabase' event. It should +// be noted that for 'newColl2' and 'coll3', the 'dropDatabase' will be no-op and will not emit any +// documents. + +// Open a new change streams and verify that from here onwards the events related to 'dropDatabase' +// are seen. +const secondResumeAfterToken = + db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken(); + +assert.commandWorked(db.dropDatabase()); + +// This group of tests ensures that the match on 'coll1' only sees the 'drop' events. +verifyOnWholeCluster(secondResumeAfterToken, + {$match: {ns: {db: dbName, coll: "coll1"}}}, + {coll1: {drop: ["coll1", "coll1"]}}, + 1 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(secondResumeAfterToken, + {$match: {"ns.coll": "coll1"}}, + {coll1: {drop: ["coll1", "coll1"]}}, + 1 /* expectedOplogRetDocsForEachShard */); +verifyOnWholeCluster(secondResumeAfterToken, + {$match: {"ns.coll": /^col.*1/}}, + {coll1: {drop: ["coll1", "coll1"]}}, + 1 /* expectedOplogRetDocsForEachShard */); + +verifyOnWholeCluster( + secondResumeAfterToken, + {$match: {ns: {db: dbName}}}, + {change_stream_match_pushdown_and_rewrite_and_rewrite: {dropDatabase: [dbName, dbName]}}, + 1 /* expectedOplogRetDocsForEachShard */); + +st.stop(); +})(); diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js new file mode 100644 index 00000000000..61ce95e1e79 --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_operation_type_rewrite.js @@ -0,0 +1,283 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the +// 'operationType' and apply it to oplog-format documents in order to filter out results as early as +// possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_and_rewrite"; +const collName = "coll1"; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); + +// Create a sharded collection. +const coll = createShardedCollection(st, "_id" /* shardKey */, dbName, collName, 2 /* splitAt */); + +// A helper that opens a change stream with the user supplied match expression 'userMatchExpr' and +// validates that, +// 1. for each shard, the events are seen in that order as specified in 'expectedOps' +// 2. each shard returns the expected number of events +// 3. the filtering is been done at oplog level +// +// Note that invalidating events cannot be tested by this function, since these will cause the +// explain used to verify the oplog-level rewrites to fail. +function verifyNonInvalidatingOps( + resumeAfterToken, userMatchExpr, expectedOps, expectedOplogRetDocsForEachShard) { + const cursor = + coll.aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]); + + // For shard1, document id is '1' and for shard2, document id is '2'. + const docIds = [1, 2]; + + for (const op of expectedOps) { + docIds.forEach(docId => { + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + assert.eq(event.operationType, op, event); + assert.eq(event.documentKey._id, docId, event); + }); + } + + assert(!cursor.hasNext()); + + // An 'executionStats' could only be captured for a non-invalidating stream. + const stats = coll.explain("executionStats") + .aggregate([{$changeStream: {resumeAfter: resumeAfterToken}}, userMatchExpr]); + + assertNumChangeStreamDocsReturnedFromShard(stats, st.rs0.name, expectedOps.length); + assertNumChangeStreamDocsReturnedFromShard(stats, st.rs1.name, expectedOps.length); + + assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogRetDocsForEachShard); + assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard); +} + +// Open a change stream and store the resume token. This resume token will be used to replay the +// stream after this point. +const resumeAfterToken = coll.watch([]).getResumeToken(); + +// These operations will create oplog events. The change stream will apply several filters on these +// series of events and ensure that the '$match' expressions are rewritten correctly. +assert.commandWorked(coll.insert({_id: 1})); +assert.commandWorked(coll.insert({_id: 2})); +assert.commandWorked(coll.update({_id: 1}, {$set: {foo: "bar"}})); +assert.commandWorked(coll.update({_id: 2}, {$set: {foo: "bar"}})); +assert.commandWorked(coll.replaceOne({_id: 1}, {_id: 1, foo: "baz"})); +assert.commandWorked(coll.replaceOne({_id: 2}, {_id: 2, foo: "baz"})); +assert.commandWorked(coll.deleteOne({_id: 1})); +assert.commandWorked(coll.deleteOne({_id: 2})); + +// Ensure that the '$match' on the 'insert' operation type is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: "insert"}}, + ["insert"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the 'update' operation type is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: "update"}}, + ["update"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the 'replace' operation type is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: "replace"}}, + ["replace"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the 'delete' operation type is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: "delete"}}, + ["delete"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the operation type as number is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: 1}}, + [] /* expectedOps */, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on an unknown operation type cannot be rewritten to the oplog format. +// The oplog cursor should return '4' documents. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: "unknown"}}, + [] /* expectedOps */, + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on an empty string operation type cannot be rewritten to the oplog +// format. The oplog cursor should return '4' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: ""}}, + [] /* expectedOps */, + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on operation type with inequality operator cannot be rewritten to the +// oplog format. The oplog cursor should return '4' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$gt: "insert"}}}, + ["update", "replace"], + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on operation type sub-field can be rewritten to the oplog format. The +// oplog cursor should return '0' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {"operationType.subField": "subOperation"}}, + [] /* expectedOps */, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the operation type with '$in' is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$in: ["insert", "update"]}}}, + ["insert", "update"], + 2 /* expectedOplogRetDocsForEachShard */); + +// Ensure that for the '$in' with one valid and one invalid operation type, rewrite to the oplog +// format will be abandoned. The oplog cursor should return '4' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$in: ["insert", "unknown"]}}}, + ["insert"], + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' with '$in' on an unknown operation type cannot be rewritten. The oplog +// cursor should return '4' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$in: ["unknown"]}}}, + [] /* expectedOps */, + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' with '$in' with operation type as number is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$in: [1]}}}, + [] /* expectedOps */, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' with '$in' with operation type as a string and a regex cannot be +// rewritten. The oplog cursor should return '4' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$in: [/^insert$/, "update"]}}}, + ["insert", "update"] /* expectedOps */, + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' on the operation type with '$nin' is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$nin: ["insert"]}}}, + ["update", "replace", "delete"], + 3 /* expectedOplogRetDocsForEachShard */); + +// Ensure that for the '$nin' with one valid and one invalid operation type, rewrite to the oplog +// format will be abandoned. The oplog cursor should return '4' documents for each shard. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$nin: ["insert", "unknown"]}}}, + ["update", "replace", "delete"], + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' with '$nin' with operation type as number is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {operationType: {$nin: [1]}}}, + ["insert", "update", "replace", "delete"], + 4 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' to match only 'insert' operations is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {$expr: {$eq: ["$operationType", "insert"]}}}, + ["insert"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' to match only 'update' operations is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {$expr: {$eq: ["$operationType", "update"]}}}, + ["update"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' to match only 'replace' operations is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {$expr: {$eq: ["$operationType", "replace"]}}}, + ["replace"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' to match only 'delete' operations is rewritten correctly. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {$expr: {$eq: ["$operationType", "delete"]}}}, + ["delete"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' is rewritten correctly when comparing with 'unknown' +// operation type. +verifyNonInvalidatingOps(resumeAfterToken, + {$match: {$expr: {$eq: ["$operationType", "unknown"]}}}, + [] /* expectedOps */, + 0 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' is rewritten correctly when '$and' is in the expression. +verifyNonInvalidatingOps(resumeAfterToken, + { + $match: { + $expr: { + $and: [ + {$gte: [{$indexOfCP: ["$operationType", "l"]}, 0]}, + {$gte: [{$indexOfCP: ["$operationType", "te"]}, 0]} + ] + } + } + }, + ["delete"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' is rewritten correctly when '$or' is in the expression. +verifyNonInvalidatingOps(resumeAfterToken, + { + $match: { + $expr: { + $or: [ + {$gte: [{$indexOfCP: ["$operationType", "l"]}, 0]}, + {$gte: [{$indexOfCP: ["$operationType", "te"]}, 0]} + ] + } + } + }, + ["update", "replace", "delete"], + 3 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' is rewritten correctly when '$not' is in the expression. +verifyNonInvalidatingOps( + resumeAfterToken, + {$match: {$expr: {$not: {$regexMatch: {input: "$operationType", regex: /e$/}}}}}, + ["insert"], + 1 /* expectedOplogRetDocsForEachShard */); + +// Ensure that the '$match' using '$expr' is rewritten correctly when nor ({$not: {$or: [...]}}) is +// in the expression. +verifyNonInvalidatingOps(resumeAfterToken, + { + $match: { + $expr: { + $not: { + $or: [ + {$eq: ["$operationType", "insert"]}, + {$eq: ["$operationType", "delete"]}, + ] + } + } + } + }, + ["update", "replace"], + 2 /* expectedOplogRetDocsForEachShard */); + +st.stop(); +})(); diff --git a/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js new file mode 100644 index 00000000000..d0254ce9145 --- /dev/null +++ b/jstests/change_streams/oplog_rewrite/change_stream_match_pushdown_to_rewrite.js @@ -0,0 +1,442 @@ +// Test that a pipeline of the form [{$changeStream: {}}, {$match: ...}] can rewrite the 'to' and +// apply it to oplog-format documents in order to filter out results as early as possible. +// @tags: [ +// featureFlagChangeStreamsRewrite, +// requires_fcv_51, +// requires_pipeline_optimization, +// requires_sharding, +// uses_change_streams, +// change_stream_does_not_expect_txns, +// assumes_unsharded_collection, +// assumes_read_preference_unchanged +// ] +(function() { +"use strict"; + +load("jstests/libs/change_stream_rewrite_util.js"); // For rewrite helpers. + +const dbName = "change_stream_match_pushdown_and_rewrite"; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); + +// A helper that opens a change stream on the whole cluster with the user supplied match expression +// 'userMatchExpr' and validates that: +// 1. for each shard, the events are seen in that order as specified in 'expectedResult' +// 2. the filtering is been done at oplog level +function verifyOnWholeCluster( + resumeAfterToken, userMatchExpr, expectedResult, expectedOplogRetDocsForEachShard) { + const cursor = db.getSiblingDB("admin").aggregate([ + {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}}, + userMatchExpr + ]); + + for (const [coll, opDict] of Object.entries(expectedResult)) { + for (const [op, eventIdentifierList] of Object.entries(opDict)) { + eventIdentifierList.forEach(eventIdentifier => { + assert.soon(() => cursor.hasNext()); + const event = cursor.next(); + assert.eq(event.operationType, op, event); + + if (op == "insert") { + assert.eq(event.documentKey._id, eventIdentifier, event); + } else if (op == "rename") { + assert.eq(event.to.coll, eventIdentifier, event); + } else { + assert(false, event); + } + assert.eq(event.ns.coll, coll); + }); + } + } + + assert(!cursor.hasNext()); + + const stats = db.getSiblingDB("admin").runCommand({ + explain: { + aggregate: 1, + pipeline: [ + {$changeStream: {resumeAfter: resumeAfterToken, allChangesForCluster: true}}, + userMatchExpr + ], + cursor: {batchSize: 0} + }, + verbosity: "executionStats" + }); + + assertNumMatchingOplogEventsForShard(stats, st.rs0.name, expectedOplogRetDocsForEachShard); + assertNumMatchingOplogEventsForShard(stats, st.rs1.name, expectedOplogRetDocsForEachShard); +} + +// Create some new collections to ensure that test cases has sufficient namespaces to verify that +// the filtering on the 'to' field is working correctly. +const coll1 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll1", 5 /* splitAt */); +const coll2 = createShardedCollection(st, "_id" /* shardKey */, dbName, "coll2", 9 /* splitAt */); + +const ns1 = dbName + ".newColl1"; +const ns2 = dbName + ".newColl2"; + +// Open a change stream and store the resume token. This resume token will be used to replay the +// stream after this point. +const resumeAfterToken = + db.getSiblingDB("admin").watch([], {allChangesForCluster: true}).getResumeToken(); + +// The inserted documents purposely contain field name 'o.to' to match with that of oplog's 'to' +// field. These fields should not interfere with the rewritten predicate and only expected documents +// should be returned at the oplog level. +assert.commandWorked(coll1.insert({_id: 3, "o.to": 3})); +assert.commandWorked(coll1.insert({_id: 4, "o.to": ""})); +assert.commandWorked(coll1.insert({_id: 5, "o.to": ns2})); +assert.commandWorked(coll1.insert({_id: 6, "o.to": dbName})); +assert.commandWorked(coll1.renameCollection("newColl1")); +assert.commandWorked(coll2.insert({_id: 7, "o.to": 3})); +assert.commandWorked(coll2.insert({_id: 8, "o.to": ""})); +assert.commandWorked(coll2.insert({_id: 9, "o.to": ns1})); +assert.commandWorked(coll2.insert({_id: 10, "o.to": dbName})); +assert.commandWorked(coll2.renameCollection("newColl2")); + +// This group of tests ensures that the '$match' on the 'to' object only sees its documents and only +// required document(s) are returned at the oplog for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {to: {db: dbName, coll: "newColl1"}}}, + {coll1: {rename: ["newColl1", "newColl1"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {to: {db: dbName, coll: "newColl2"}}}, + {coll2: {rename: ["newColl2", "newColl2"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on the 'to' object with only db component should not emit any document +// and the oplog should not return any documents. +verifyOnWholeCluster( + resumeAfterToken, {$match: {to: {db: dbName}}}, {}, 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the 'to' object with 'unknown' collection does not exists and the oplog cursor +// returns 0 document. +verifyOnWholeCluster(resumeAfterToken, + {$match: {to: {db: dbName, coll: "unknown"}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the 'to' object with flipped fields does not match and the oplog cursor returns 0 +// document. +verifyOnWholeCluster(resumeAfterToken, + {$match: {to: {coll: "newColl1", db: dbName}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the 'to' object with extra fields does not match and the oplog cursor returns 0 +// document. +verifyOnWholeCluster(resumeAfterToken, + {$match: {to: {db: dbName, coll: "newColl1", extra: "extra"}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the empty 'to' object does not match and the oplog cursor returns 0 document. +verifyOnWholeCluster(resumeAfterToken, {$match: {to: {}}}, {}, 0); + +// Ensure the '$match' on the db field path should only return documents with rename op type for all +// collections and oplog should also return same for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": dbName}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": /^change_stream_match_pushdown.*$/}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": /^(change_stream_match_pushdown.*$)/}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": /^(Change_Stream_MATCH_PUSHDOWN.*$)/i}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": /(^unknown$|^change_stream_match_pushdown.*$)/}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": /^unknown$|^change_stream_match_pushdown.*$/}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on non-existing db should not return any document and oplog should not +// return any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"to.db": "unknown"}}, {}, 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on empty db should not return any document and oplog should not return +// any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"to.db": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on sub field of db should not return any document and oplog should not +// return any document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.db.extra": dbName}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// This group of tests ensures that the '$match' on collection field path should emit only the +// required documents and oplog should return only required document(s) for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": "newColl1"}}, + {coll1: {rename: ["newColl1", "newColl1"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": "newColl2"}}, + {coll2: {rename: ["newColl2", "newColl2"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": /^newColl.*1/}}, + {coll1: {rename: ["newColl1", "newColl1"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": /^newColl.*2/}}, + {coll2: {rename: ["newColl2", "newColl2"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); + +// This group of tests ensures that the '$match' on the regex matching all collections should return +// documents with rename op type and oplog should also return same for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": /^newColl.*/}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": /^newColl.*/i}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": /^newColl.*1$|^newColl.*2$/}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on the regex to exclude 'coll1' should return only documents from +// 'coll2' and oplog should return required documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": /^newColl[^1]/}}, + {coll2: {rename: ["newColl2", "newColl2"]}}, + 1 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on non-existing collection should not return any document and oplog +// should not return any document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": "unknown"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on empty collection should not return any document and oplog should not +// return any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"to.coll": ""}}, {}, 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that the '$match' on sub field of collection should not return any document and oplog +// should not return any document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll.extra": "coll1"}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' on db should return all documents with rename op type and oplog should also +// return same each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": {$in: [dbName, "unknown"]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": {$in: [/^change_stream_match.*$/, /^unknown$/]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": {$in: [/^change_stream_MATCH.*$/i, /^unknown$/i]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that an empty '$in' on db path should not match any collection and oplog should not return +// any document for each shard. +verifyOnWholeCluster( + resumeAfterToken, {$match: {"to.db": {$in: []}}}, {}, 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' with invalid db cannot be rewritten and oplog should return all documents for +// each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": {$in: [dbName, 1]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 6 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' on db path with mix of string and regex can be rewritten and oplog should +// return '0' document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.db": {$in: ["unknown1", /^unknown2$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' on multiple collections should return the required documents and oplog should +// return required documents for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to": {$in: [{db: dbName, coll: "newColl1"}, {db: dbName, coll: "newColl2"}]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": {$in: ["newColl1", "newColl2"]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": {$in: [/^newColl1$/, /^newColl2$/]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); + +// This group of tests ensures that '$in' on regex of matching all collections should return all +// documents with rename op type and oplog should also return same for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": {$in: [/^newColl.*$/, /^unknown.*/]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": {$in: [/^newColl.*$/i, /^unknown/i]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that an empty '$in' should not match any collection and oplog should not return any +// document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$in: []}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' with invalid collection cannot be rewritten and oplog should return all +// documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$in: ["newColl1", 1]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}}, + 6 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' with mix of string and regex matching collections can be rewritten and oplog +// should return required documents for each shard. +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.coll": {$in: ["newColl1", /^newColl.*2$/]}}}, + {coll1: {rename: ["newColl1", "newColl1"]}, coll2: {rename: ["newColl2", "newColl2"]}}, + 2 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$in' with mix of string and regex can be rewritten and oplog should return '0' +// document for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$in: ["unknown1", /^unknown2$/]}}}, + {}, + 0 /* expectedOplogRetDocsForEachShard*/); + +// This group of tests ensure that '$nin' on db path should return all documents and oplog should +// return all documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.db": {$nin: []}}}, + { + coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 6 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.db": {$nin: ["unknown1", "unknown2"]}}}, + { + coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 6 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.db": {$nin: [/^unknown1$/, /^unknown2$/]}}}, + { + coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 6 /* expectedOplogRetDocsForEachShard*/); + +// These group of tests ensure that '$nin' on matching db name should not return any documents with +// rename op type and oplog should also return same each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.db": {$nin: [dbName, "unknown"]}}}, + {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}}, + 4 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster( + resumeAfterToken, + {$match: {"to.db": {$nin: [/change_stream_match_pushdown_and_rewr.*/, /^unknown.*/]}}}, + {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}}, + 4 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$nin' on a collection should return the documents with only insert op type for that +// collection and oplog should also return same for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$nin: ["newColl1", "unknown"]}}}, + { + coll1: {insert: [3, 4, 5, 6]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 5 /* expectedOplogRetDocsForEachShard*/); +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$nin: [/^newColl1$/, /^unknown$/]}}}, + { + coll1: {insert: [3, 4, 5, 6]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 5 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$nin' on regex of matching all collections should only return documents with op type +// insert and oplog should also return same for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$nin: [/^newColl.*$/, /^unknown.*$/]}}}, + {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}}, + 4 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that an empty '$nin' should match all collections and oplog should return all documents +// for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$nin: []}}}, + { + coll1: {insert: [3, 4, 5, 6], rename: ["newColl1", "newColl1"]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 6 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$nin' with invalid collection cannot be rewritten and oplog should return all +// documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$nin: ["newColl1", 1]}}}, + { + coll1: {insert: [3, 4, 5, 6]}, + coll2: {insert: [7, 8, 9, 10], rename: ["newColl2", "newColl2"]} + }, + 6 /* expectedOplogRetDocsForEachShard*/); + +// Ensure that '$nin' with mix of string and regex can be rewritten and oplog should return required +// documents for each shard. +verifyOnWholeCluster(resumeAfterToken, + {$match: {"to.coll": {$nin: ["newColl1", /^newColl2$/]}}}, + {coll1: {insert: [3, 4, 5, 6]}, coll2: {insert: [7, 8, 9, 10]}}, + 4 /* expectedOplogRetDocsForEachShard*/); + +st.stop(); +})(); |