summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyle Suarez <kyle.suarez@mongodb.com>2022-05-16 19:56:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-16 20:19:29 +0000
commit5cc18bcd682fc59cd3714800a69f34246049caee (patch)
tree73267de7f272f6b8fe9f4b906f685010159f8d42
parent04e49a5d48ac6e9486f196806efae1ed11d79948 (diff)
downloadmongo-5cc18bcd682fc59cd3714800a69f34246049caee.tar.gz
SERVER-65859 Enable filtering of 'fromMigrate' change steam events for individual operations within applyOps array
-rw-r--r--jstests/sharding/change_stream_no_orphans.js48
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp7
2 files changed, 27 insertions, 28 deletions
diff --git a/jstests/sharding/change_stream_no_orphans.js b/jstests/sharding/change_stream_no_orphans.js
index ce4d35c847c..4ee6fb4f562 100644
--- a/jstests/sharding/change_stream_no_orphans.js
+++ b/jstests/sharding/change_stream_no_orphans.js
@@ -53,16 +53,16 @@ let suspendRangeDeletionShard0 = configureFailPoint(st.shard0, 'suspendRangeDele
assert.commandWorked(
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}}));
-const coll = st.s.getCollection(collNS);
-assert.commandWorked(coll.insert({_id: -2, name: 'emma', age: 20})); // Test case 4
-assert.commandWorked(coll.insert({_id: -1, name: 'olivia', age: 25})); // Test case 3
-assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 30})); // Test case 1
-assert.commandWorked(coll.insert({_id: 1, name: 'matt', age: 35})); // Test case 1
-assert.commandWorked(coll.insert({_id: 2, name: 'john', age: 40})); // Test case 2
-assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 45})); // Test case 2
-assert.commandWorked(coll.insert({_id: 4, name: 'robert', age: 50})); // Test case 2
-assert.commandWorked(coll.insert({_id: 5, name: 'james', age: 55})); // Test case 3
-assert.commandWorked(coll.insert({_id: 6, name: 'liam', age: 60})); // Test case 4
+const mongosColl = st.s.getCollection(collNS);
+assert.commandWorked(mongosColl.insert({_id: -2, name: 'emma', age: 20})); // Test case 4
+assert.commandWorked(mongosColl.insert({_id: -1, name: 'olivia', age: 25})); // Test case 3
+assert.commandWorked(mongosColl.insert({_id: 0, name: 'matt', age: 30})); // Test case 1
+assert.commandWorked(mongosColl.insert({_id: 1, name: 'matt', age: 35})); // Test case 1
+assert.commandWorked(mongosColl.insert({_id: 2, name: 'john', age: 40})); // Test case 2
+assert.commandWorked(mongosColl.insert({_id: 3, name: 'robert', age: 45})); // Test case 2
+assert.commandWorked(mongosColl.insert({_id: 4, name: 'robert', age: 50})); // Test case 2
+assert.commandWorked(mongosColl.insert({_id: 5, name: 'james', age: 55})); // Test case 3
+assert.commandWorked(mongosColl.insert({_id: 6, name: 'liam', age: 60})); // Test case 4
// Move the chunk to the second shard leaving orphaned documents on the first shard.
assert.commandWorked(st.s.adminCommand({split: collNS, middle: {_id: 0}}));
@@ -70,7 +70,7 @@ assert.commandWorked(
st.s.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName}));
// Setup a change stream on the collection to receive real-time events on any data changes.
-const changeStream = coll.watch([]);
+const changeStream = mongosColl.watch([]);
////////////////////////////////////////////////////////////////////////////////////////////////////
// Test case 1: Direct operations to shard on orphaned documents
@@ -146,7 +146,7 @@ jsTest.log('A direct delete to a shard of multi-documents does not generate dele
jsTest.log('A broadcasted update of a single document generates an update event');
{
// Send a broadcasted update (query on non-key field) on a single document to all the shards.
- assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 41}}, {multi: true}));
+ assert.commandWorked(mongosColl.update({name: 'john'}, {$set: {age: 41}}, {multi: true}));
// The document is hosted by the second shard and the update event is notified. The first shard
// still hosts the orphaned document so no additional event must be notified.
@@ -163,7 +163,7 @@ jsTest.log('A broadcasted update of a single document generates an update event'
jsTest.log('A broadcasted delete of a single document generates a delete event');
{
// Send a broadcasted delete (query on non-key field) on a single document to all the shards.
- assert.commandWorked(coll.remove({name: 'john'}));
+ assert.commandWorked(mongosColl.remove({name: 'john'}));
// The document is hosted by the second shard and the delete event is notified. The first shard
// still hosts the orphaned document so no additional event must be notified.
@@ -180,7 +180,7 @@ jsTest.log('A broadcasted delete of a single document generates a delete event')
jsTest.log('A broadcasted update of multi-documents generates more update events');
{
// Send a broadcasted update (query on non-key field) on two documents to all the shards.
- assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 46}}, {multi: true}));
+ assert.commandWorked(mongosColl.update({name: 'robert'}, {$set: {age: 46}}, {multi: true}));
// The documents are hosted by the second shard and two delete events are notified. The first
// shard still hosts the orphaned documents so no additional event must be notified.
@@ -201,7 +201,7 @@ jsTest.log('A broadcasted update of multi-documents generates more update events
jsTest.log('A broadcasted delete of multi-documents generates more delete events');
{
// Send a broadcasted delete (query on non-key field) on two documents to all the shards.
- assert.commandWorked(coll.remove({name: 'robert'}));
+ assert.commandWorked(mongosColl.remove({name: 'robert'}));
// The documents are hosted by the second shard and two delete events are notified. The first
// shard still hosts the orphaned documents so no additional event must be notified.
@@ -296,14 +296,10 @@ jsTest.log('Direct updates (via a transaction to a shard) of both orphaned and o
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();
- // The shard hosts both orphaned (liam) and owned (emma) documents. Consequently, only one
- // update event is notified.
- // TODO (SERVER-65859): The second update event will be filtered out when the ticket is
- // completed.
+ // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only
+ // one update event is notified.
assert.soon(() => changeStream.hasNext(), 'A first update event is expected');
assert.eq(changeStream.next().operationType, 'update');
- assert.soon(() => changeStream.hasNext(), 'A second update event is expected');
- assert.eq(changeStream.next().operationType, 'update');
assertNoChanges(changeStream);
// Both orphaned (liam) and owned (emma) documents on the shard have been updated.
@@ -324,14 +320,10 @@ jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and o
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();
- // The shard hosts both orphaned (liam) and owned (emma) documents. Consequently, only one
- // update event is notified.
- // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is
- // completed.
+ // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only
+ // one update event is notified.
assert.soon(() => changeStream.hasNext(), 'A first delete event is expected');
assert.eq(changeStream.next().operationType, 'delete');
- assert.soon(() => changeStream.hasNext(), 'A second delete event is expected');
- assert.eq(changeStream.next().operationType, 'delete');
assertNoChanges(changeStream);
// Both orphaned (liam) and owned (emma) documents on the shard have been removed.
@@ -343,7 +335,7 @@ jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and o
jsTest.log('The collection drop generates a drop event');
{
- coll.drop();
+ mongosColl.drop();
// Essentially, this verifies that the operation before dropping the collection did not notify
// additional and unexpected events.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index 40603a0b1d7..05bfcff4df7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -63,6 +63,13 @@ std::unique_ptr<MatchExpression> buildUnwindTransactionFilter(
// filtered out by the default 'ns' filter this stage gets initialized with.
auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr));
+ // To correctly handle filtering out entries of direct write operations on orphaned documents,
+ // we include a filter for "fromMigrate" flagged operations, unless "fromMigrate" events are
+ // explicitly requested in the spec.
+ if (!expCtx->changeStreamSpec->getShowMigrationEvents()) {
+ unwindFilter->add(buildNotFromMigrateFilter(expCtx, userMatch));
+ }
+
// Attempt to rewrite the user's filter and combine it with the standard operation filter. We do
// this separately because we need to exclude certain fields from the user's filters. Unwound
// transaction events do not have these fields until we populate them from the commitTransaction