diff options
author | Kyle Suarez <kyle.suarez@mongodb.com> | 2022-05-16 19:56:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-16 20:19:29 +0000 |
commit | 5cc18bcd682fc59cd3714800a69f34246049caee (patch) | |
tree | 73267de7f272f6b8fe9f4b906f685010159f8d42 | |
parent | 04e49a5d48ac6e9486f196806efae1ed11d79948 (diff) | |
download | mongo-5cc18bcd682fc59cd3714800a69f34246049caee.tar.gz |
SERVER-65859 Enable filtering of 'fromMigrate' change steam events for individual operations within applyOps array
-rw-r--r-- | jstests/sharding/change_stream_no_orphans.js | 48 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp | 7 |
2 files changed, 27 insertions, 28 deletions
diff --git a/jstests/sharding/change_stream_no_orphans.js b/jstests/sharding/change_stream_no_orphans.js index ce4d35c847c..4ee6fb4f562 100644 --- a/jstests/sharding/change_stream_no_orphans.js +++ b/jstests/sharding/change_stream_no_orphans.js @@ -53,16 +53,16 @@ let suspendRangeDeletionShard0 = configureFailPoint(st.shard0, 'suspendRangeDele assert.commandWorked( st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName})); assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}})); -const coll = st.s.getCollection(collNS); -assert.commandWorked(coll.insert({_id: -2, name: 'emma', age: 20})); // Test case 4 -assert.commandWorked(coll.insert({_id: -1, name: 'olivia', age: 25})); // Test case 3 -assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 30})); // Test case 1 -assert.commandWorked(coll.insert({_id: 1, name: 'matt', age: 35})); // Test case 1 -assert.commandWorked(coll.insert({_id: 2, name: 'john', age: 40})); // Test case 2 -assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 45})); // Test case 2 -assert.commandWorked(coll.insert({_id: 4, name: 'robert', age: 50})); // Test case 2 -assert.commandWorked(coll.insert({_id: 5, name: 'james', age: 55})); // Test case 3 -assert.commandWorked(coll.insert({_id: 6, name: 'liam', age: 60})); // Test case 4 +const mongosColl = st.s.getCollection(collNS); +assert.commandWorked(mongosColl.insert({_id: -2, name: 'emma', age: 20})); // Test case 4 +assert.commandWorked(mongosColl.insert({_id: -1, name: 'olivia', age: 25})); // Test case 3 +assert.commandWorked(mongosColl.insert({_id: 0, name: 'matt', age: 30})); // Test case 1 +assert.commandWorked(mongosColl.insert({_id: 1, name: 'matt', age: 35})); // Test case 1 +assert.commandWorked(mongosColl.insert({_id: 2, name: 'john', age: 40})); // Test case 2 +assert.commandWorked(mongosColl.insert({_id: 3, name: 'robert', age: 45})); // Test case 2 +assert.commandWorked(mongosColl.insert({_id: 4, name: 'robert', age: 50})); // Test case 2 +assert.commandWorked(mongosColl.insert({_id: 5, name: 'james', age: 55})); // Test case 3 +assert.commandWorked(mongosColl.insert({_id: 6, name: 'liam', age: 60})); // Test case 4 // Move the chunk to the second shard leaving orphaned documents on the first shard. assert.commandWorked(st.s.adminCommand({split: collNS, middle: {_id: 0}})); @@ -70,7 +70,7 @@ assert.commandWorked( st.s.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName})); // Setup a change stream on the collection to receive real-time events on any data changes. -const changeStream = coll.watch([]); +const changeStream = mongosColl.watch([]); //////////////////////////////////////////////////////////////////////////////////////////////////// // Test case 1: Direct operations to shard on orphaned documents @@ -146,7 +146,7 @@ jsTest.log('A direct delete to a shard of multi-documents does not generate dele jsTest.log('A broadcasted update of a single document generates an update event'); { // Send a broadcasted update (query on non-key field) on a single document to all the shards. - assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 41}}, {multi: true})); + assert.commandWorked(mongosColl.update({name: 'john'}, {$set: {age: 41}}, {multi: true})); // The document is hosted by the second shard and the update event is notified. The first shard // still hosts the orphaned document so no additional event must be notified. @@ -163,7 +163,7 @@ jsTest.log('A broadcasted update of a single document generates an update event' jsTest.log('A broadcasted delete of a single document generates a delete event'); { // Send a broadcasted delete (query on non-key field) on a single document to all the shards. - assert.commandWorked(coll.remove({name: 'john'})); + assert.commandWorked(mongosColl.remove({name: 'john'})); // The document is hosted by the second shard and the delete event is notified. The first shard // still hosts the orphaned document so no additional event must be notified. @@ -180,7 +180,7 @@ jsTest.log('A broadcasted delete of a single document generates a delete event') jsTest.log('A broadcasted update of multi-documents generates more update events'); { // Send a broadcasted update (query on non-key field) on two documents to all the shards. - assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 46}}, {multi: true})); + assert.commandWorked(mongosColl.update({name: 'robert'}, {$set: {age: 46}}, {multi: true})); // The documents are hosted by the second shard and two delete events are notified. The first // shard still hosts the orphaned documents so no additional event must be notified. @@ -201,7 +201,7 @@ jsTest.log('A broadcasted update of multi-documents generates more update events jsTest.log('A broadcasted delete of multi-documents generates more delete events'); { // Send a broadcasted delete (query on non-key field) on two documents to all the shards. - assert.commandWorked(coll.remove({name: 'robert'})); + assert.commandWorked(mongosColl.remove({name: 'robert'})); // The documents are hosted by the second shard and two delete events are notified. The first // shard still hosts the orphaned documents so no additional event must be notified. @@ -296,14 +296,10 @@ jsTest.log('Direct updates (via a transaction to a shard) of both orphaned and o assert.commandWorked(session.commitTransaction_forTesting()); session.endSession(); - // The shard hosts both orphaned (liam) and owned (emma) documents. Consequently, only one - // update event is notified. - // TODO (SERVER-65859): The second update event will be filtered out when the ticket is - // completed. + // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only + // one update event is notified. assert.soon(() => changeStream.hasNext(), 'A first update event is expected'); assert.eq(changeStream.next().operationType, 'update'); - assert.soon(() => changeStream.hasNext(), 'A second update event is expected'); - assert.eq(changeStream.next().operationType, 'update'); assertNoChanges(changeStream); // Both orphaned (liam) and owned (emma) documents on the shard have been updated. @@ -324,14 +320,10 @@ jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and o assert.commandWorked(session.commitTransaction_forTesting()); session.endSession(); - // The shard hosts both orphaned (liam) and owned (emma) documents. Consequently, only one - // update event is notified. - // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is - // completed. + // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only + // one update event is notified. assert.soon(() => changeStream.hasNext(), 'A first delete event is expected'); assert.eq(changeStream.next().operationType, 'delete'); - assert.soon(() => changeStream.hasNext(), 'A second delete event is expected'); - assert.eq(changeStream.next().operationType, 'delete'); assertNoChanges(changeStream); // Both orphaned (liam) and owned (emma) documents on the shard have been removed. @@ -343,7 +335,7 @@ jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and o jsTest.log('The collection drop generates a drop event'); { - coll.drop(); + mongosColl.drop(); // Essentially, this verifies that the operation before dropping the collection did not notify // additional and unexpected events. diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index 40603a0b1d7..05bfcff4df7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -63,6 +63,13 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter( // filtered out by the default 'ns' filter this stage gets initialized with. auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr)); + // To correctly handle filtering out entries of direct write operations on orphaned documents, + // we include a filter for "fromMigrate" flagged operations, unless "fromMigrate" events are + // explicitly requested in the spec. + if (!expCtx->changeStreamSpec->getShowMigrationEvents()) { + unwindFilter->add(buildNotFromMigrateFilter(expCtx, userMatch)); + } + // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do // this separately because we need to exclude certain fields from the user's filters. Unwound // transaction events do not have these fields until we populate them from the commitTransaction |