summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2022-05-06 16:51:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-12 10:21:06 +0000
commit8c9ceb09f28bdecbfc896994f45f9a69f1011eca (patch)
tree53784356dcf0e4008da59b8d727b27a066b72329
parent7dc5d8f8ef8cb4f7a1c188f2200dbe3dd380b8f6 (diff)
downloadmongo-8c9ceb09f28bdecbfc896994f45f9a69f1011eca.tar.gz
SERVER-65858 Mark the applyOps entries when the operations involves orphaned documents
-rw-r--r--jstests/sharding/change_stream_no_orphans.js271
-rw-r--r--jstests/sharding/internal_txns/retry_on_transient_error_basic.js4
-rw-r--r--src/mongo/db/op_observer_impl.cpp23
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/db/repl/oplog_entry.h9
-rw-r--r--src/mongo/db/repl/oplog_entry.idl8
6 files changed, 259 insertions, 58 deletions
diff --git a/jstests/sharding/change_stream_no_orphans.js b/jstests/sharding/change_stream_no_orphans.js
index e835d75a72f..98d5fd1bcd6 100644
--- a/jstests/sharding/change_stream_no_orphans.js
+++ b/jstests/sharding/change_stream_no_orphans.js
@@ -3,10 +3,11 @@
* streams and (2) have no effect on the persisted data.
*
* The behavior is tested in the following scenarios:
- * - Direct update/delete (from shard) to orphaned document
- * - Distributed update/delete (from router) on a single document
- * - Distributed update/delete (from router) to multi-documents
- * - Distributed update/delete (from router) via transaction on multi-documents
+ * - Direct operations to shard on orphaned documents
+ * - Broadcasted operations (from router) on orphaned documents
+ * - Transaction from router updating both orphaned and non-orphaned documents
+ * - Transaction to shard updating both orphaned and non-orphaned documents
+ * - Batched deletes from router and to shard
*
* @tags: [
* requires_fcv_53,
@@ -41,12 +42,14 @@ assert.commandWorked(
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}}));
const coll = st.s.getCollection(collNS);
-assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 20}));
-assert.commandWorked(coll.insert({_id: 1, name: 'john', age: 25}));
-assert.commandWorked(coll.insert({_id: 2, name: 'robert', age: 30}));
-assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 35}));
-assert.commandWorked(coll.insert({_id: 4, name: 'james', age: 40}));
-assert.commandWorked(coll.insert({_id: 5, name: 'james', age: 45}));
+assert.commandWorked(coll.insert({_id: -2, name: 'emma', age: 20}));
+assert.commandWorked(coll.insert({_id: -1, name: 'olivia', age: 25}));
+assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 30}));
+assert.commandWorked(coll.insert({_id: 1, name: 'john', age: 35}));
+assert.commandWorked(coll.insert({_id: 2, name: 'robert', age: 40}));
+assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 45}));
+assert.commandWorked(coll.insert({_id: 4, name: 'james', age: 50}));
+assert.commandWorked(coll.insert({_id: 5, name: 'liam', age: 55}));
// Move the chunk to the second shard leaving orphaned documents on the first shard.
assert.commandWorked(st.s.adminCommand({split: collNS, middle: {_id: 0}}));
@@ -56,10 +59,14 @@ assert.commandWorked(
// Setup a change stream on the collection to receive real-time events on any data changes.
const changeStream = coll.watch([]);
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// Direct operations to shard on orphaned documents
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
jsTest.log('A direct insert to a shard of an orphaned document does not generate an insert event');
{
// Direct insert to first shard of an orphaned document.
- assert.commandWorked(st.shard0.getCollection(collNS).insert({_id: 6, name: 'ken', age: 50}));
+ assert.commandWorked(st.shard0.getCollection(collNS).insert({_id: 6, name: 'ken', age: 60}));
// No event is notified.
assert(!changeStream.hasNext());
@@ -68,19 +75,19 @@ jsTest.log('A direct insert to a shard of an orphaned document does not generate
assert.neq(null, st.shard0.getCollection(collNS).findOne({_id: 6}));
}
-jsTest.log('A direct update to a shard on an orphaned document generates an update event');
+jsTest.log('A direct update to a shard of an orphaned document does not generate an update event');
{
// Send a direct update to first shard on an orphaned document.
- assert.commandWorked(st.shard0.getCollection(collNS).update({name: 'matt'}, {$set: {age: 21}}));
+ assert.commandWorked(st.shard0.getCollection(collNS).update({name: 'matt'}, {$set: {age: 31}}));
// No change stream event is generated.
assert(!changeStream.hasNext());
// The orphaned document on first shard has been updated.
- assert.eq(21, st.shard0.getCollection(collNS).findOne({_id: 0}).age);
+ assert.eq(31, st.shard0.getCollection(collNS).findOne({_id: 0}).age);
}
-jsTest.log('A direct delete to a shard on an orphaned document generates an update event');
+jsTest.log('A direct delete to a shard of an orphaned document does generate an update event');
{
// Send a direct delete to first shard on an orphaned document.
assert.commandWorked(st.shard0.getCollection(collNS).remove({name: 'matt'}));
@@ -92,10 +99,14 @@ jsTest.log('A direct delete to a shard on an orphaned document generates an upda
assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: 0}));
}
-jsTest.log('A distributed update on a single document generates an update event');
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// Broadcasted operations (from router) on orphaned documents
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+jsTest.log('A broadcasted update of a single document generates an update event');
{
- // Send a distributed update (query on non-key field) on a single document to all the shards.
- assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 26}}, {multi: true}));
+ // Send a broadcasted update (query on non-key field) on a single document to all the shards.
+ assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 36}}, {multi: true}));
// The document is hosted by the second shard and the update event is notified. The first shard
// still hosts the orphaned document so no additional event must be notified.
@@ -105,13 +116,13 @@ jsTest.log('A distributed update on a single document generates an update event'
// The orphaned document on first shard has not been updated, unlike the non-orphaned one on the
// second shard.
- assert.eq(25, st.shard0.getCollection(collNS).findOne({_id: 1}).age);
- assert.eq(26, st.shard1.getCollection(collNS).findOne({_id: 1}).age);
+ assert.eq(35, st.shard0.getCollection(collNS).findOne({_id: 1}).age);
+ assert.eq(36, st.shard1.getCollection(collNS).findOne({_id: 1}).age);
}
-jsTest.log('A distributed delete on a single document generates a delete event');
+jsTest.log('A broadcasted delete of a single document generates a delete event');
{
- // Send a distributed delete (query on non-key field) on a single document to all the shards.
+ // Send a broadcasted delete (query on non-key field) on a single document to all the shards.
assert.commandWorked(coll.remove({name: 'john'}));
// The document is hosted by the second shard and the delete event is notified. The first shard
@@ -126,10 +137,10 @@ jsTest.log('A distributed delete on a single document generates a delete event')
assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 1}));
}
-jsTest.log('A distributed update on multi-documents generates more update events');
+jsTest.log('A broadcasted update of multi-documents generates more update events');
{
- // Send a distributed update (query on non-key field) on two documents to all the shards.
- assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 36}}, {multi: true}));
+ // Send a broadcasted update (query on non-key field) on two documents to all the shards.
+ assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 41}}, {multi: true}));
// The documents are hosted by the second shard and two delete events are notified. The first
// shard still hosts the orphaned documents so no additional event must be notified.
@@ -141,15 +152,15 @@ jsTest.log('A distributed update on multi-documents generates more update events
// The orphaned documents on first shard have not been updated, unlike the non-orphaned ones on
// the second shard.
- assert.eq(30, st.shard0.getCollection(collNS).findOne({_id: 2}).age);
- assert.eq(35, st.shard0.getCollection(collNS).findOne({_id: 3}).age);
- assert.eq(36, st.shard1.getCollection(collNS).findOne({_id: 2}).age);
- assert.eq(36, st.shard1.getCollection(collNS).findOne({_id: 3}).age);
+ assert.eq(40, st.shard0.getCollection(collNS).findOne({_id: 2}).age);
+ assert.eq(45, st.shard0.getCollection(collNS).findOne({_id: 3}).age);
+ assert.eq(41, st.shard1.getCollection(collNS).findOne({_id: 2}).age);
+ assert.eq(41, st.shard1.getCollection(collNS).findOne({_id: 3}).age);
}
-jsTest.log('A distributed delete on multi-documents generates more delete events');
+jsTest.log('A broadcasted delete of multi-documents generates more delete events');
{
- // Send a distributed delete (query on non-key field) on two documents to all the shards.
+ // Send a broadcasted delete (query on non-key field) on two documents to all the shards.
assert.commandWorked(coll.remove({name: 'robert'}));
// The documents are hosted by the second shard and two delete events are notified. The first
@@ -168,62 +179,132 @@ jsTest.log('A distributed delete on multi-documents generates more delete events
assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 3}));
}
-jsTest.log('A distributed update via transaction on multi-documents generates more update events');
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// Transaction from router updating both orphaned and non-orphaned documents
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+jsTest.log('Broadcasted updates (via a transaction through the router) of both orphaned and ' +
+ 'non-orphaned documents generate events only for operations on non-orphaned documents');
{
- // Send a distributed update (query on non-key field) via transaction on two documents to all
- // the shards.
+ // Send a broadcasted transaction to the router updating both orphaned and non-orphaned
+ // documents.
const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);
const sessionColl = sessionDB.getCollection(collName);
session.startTransaction();
- assert.commandWorked(sessionColl.update({name: 'james'}, {$set: {age: 46}}, {multi: true}));
+ assert.commandWorked(sessionColl.update({name: 'olivia'}, {$set: {age: 26}}, {multi: true}));
+ assert.commandWorked(sessionColl.update({name: 'james'}, {$set: {age: 51}}, {multi: true}));
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();
- // The documents are hosted by the second shard and two delete events are notified. The first
- // shard still hosts the orphaned documents so no additional event must be notified.
+ // The primary shard hosts orphaned (james) and non-orphaned (olivia) documents, whereas the
+ // second shard hosts a non-orphaned document (james). Consequently, two update events are
+ // notified.
assert.soon(() => changeStream.hasNext(), 'A first update event is expected');
assert.eq(changeStream.next().operationType, 'update');
assert.soon(() => changeStream.hasNext(), 'A second update event is expected');
assert.eq(changeStream.next().operationType, 'update');
assert(!changeStream.hasNext());
- // The orphaned documents on first shard have not been updated, unlike the non-orphaned ones on
- // the second shard.
- assert.eq(40, st.shard0.getCollection(collNS).findOne({_id: 4}).age);
- assert.eq(45, st.shard0.getCollection(collNS).findOne({_id: 5}).age);
- assert.eq(46, st.shard1.getCollection(collNS).findOne({_id: 4}).age);
- assert.eq(46, st.shard1.getCollection(collNS).findOne({_id: 5}).age);
+ // The orphaned document on first shard (james) has not been updated, unlike the non-orphaned
+ // ones on both primary and second shards (olivia and james).
+ assert.eq(26, st.shard0.getCollection(collNS).findOne({_id: -1}).age);
+ assert.eq(50, st.shard0.getCollection(collNS).findOne({_id: 4}).age);
+ assert.eq(51, st.shard1.getCollection(collNS).findOne({_id: 4}).age);
}
-jsTest.log('A distributed delete via transaction on multi-documents generates more delete events');
+jsTest.log('Broadcasted deletes (via a transaction through the router) of both orphaned and ' +
+ 'non-orphaned documents generate events only for operations on non-orphaned documents');
{
- // Send a distributed delete (query on non-key field) via transaction on two documents to all
- // the shards.
+ // Send a broadcasted transaction to the router deleting both orphaned and non-orphaned
+ // documents.
const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);
const sessionColl = sessionDB.getCollection(collName);
session.startTransaction();
+ assert.commandWorked(sessionColl.remove({name: 'olivia'}));
assert.commandWorked(sessionColl.remove({name: 'james'}));
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();
- // The documents are hosted by the second shard and two delete events are notified. The first
- // shard still hosts the orphaned documents so no additional event must be notified.
+ // The primary shard hosts orphaned (james) and non-orphaned (olivia) documents, whereas the
+ // second shard hosts a non-orphaned document (james). Consequently, two delete events are
+ // notified.
assert.soon(() => changeStream.hasNext(), 'A first delete event is expected');
assert.eq(changeStream.next().operationType, 'delete');
assert.soon(() => changeStream.hasNext(), 'A second delete event is expected');
assert.eq(changeStream.next().operationType, 'delete');
assert(!changeStream.hasNext());
- // The orphaned documents on first shard have not been removed, unlike the non-orphaned ones on
- // the second shard.
+ // The orphaned document on first shard (james) has not been removed, unlike the non-orphaned
+ // ones on both primary and second shards (olivia and james).
+ assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: -1}));
assert.neq(null, st.shard0.getCollection(collNS).findOne({_id: 4}));
- assert.neq(null, st.shard0.getCollection(collNS).findOne({_id: 5}));
assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 4}));
- assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 5}));
}
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// Transaction to shard updating both orphaned and non-orphaned documents
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+jsTest.log('Direct updates (via a transaction to a shard) of both orphaned and non-orphaned' +
+ 'documents generate events only for operations on non-orphaned documents');
+{
+ // Send a direct transaction to a shard updating both orphaned and non-orphaned documents.
+ const session = st.rs0.getPrimary().getDB(dbName).getMongo().startSession();
+ const sessionDB = session.getDatabase(dbName);
+ const sessionColl = sessionDB.getCollection(collName);
+ session.startTransaction();
+ assert.commandWorked(sessionColl.update({name: 'emma'}, {$set: {age: 21}}, {multi: true}));
+ assert.commandWorked(sessionColl.update({name: 'liam'}, {$set: {age: 56}}, {multi: true}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+
+ // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only
+ // one update event is notified.
+ // TODO (SERVER-65859): The second update event will be filtered out when the ticket is
+ // completed.
+ assert.soon(() => changeStream.hasNext(), 'A first update event is expected');
+ assert.eq(changeStream.next().operationType, 'update');
+ assert.soon(() => changeStream.hasNext(), 'A second update event is expected');
+ assert.eq(changeStream.next().operationType, 'update');
+ assert(!changeStream.hasNext());
+
+ // Both orphaned (liam) and non-orphaned (emma) documents on the shard have been updated.
+ assert.eq(21, st.shard0.getCollection(collNS).findOne({_id: -2}).age);
+ assert.eq(56, st.shard0.getCollection(collNS).findOne({_id: 5}).age);
+}
+
+jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and non-orphaned' +
+ 'documents generate events only for operations on non-orphaned documents');
+{
+ // Send a direct transaction to a shard deleting both orphaned and non-orphaned documents.
+ const session = st.rs0.getPrimary().getDB(dbName).getMongo().startSession();
+ const sessionDB = session.getDatabase(dbName);
+ const sessionColl = sessionDB.getCollection(collName);
+ session.startTransaction();
+ assert.commandWorked(sessionColl.remove({name: 'emma'}));
+ assert.commandWorked(sessionColl.remove({name: 'liam'}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+
+ // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only
+ // one update event is notified.
+ // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is
+ // completed.
+ assert.soon(() => changeStream.hasNext(), 'A first delete event is expected');
+ assert.eq(changeStream.next().operationType, 'delete');
+ assert.soon(() => changeStream.hasNext(), 'A second delete event is expected');
+ assert.eq(changeStream.next().operationType, 'delete');
+ assert(!changeStream.hasNext());
+
+ // Both orphaned (liam) and non-orphaned (emma) documents on the shard have been removed.
+ assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: -2}));
+ assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: 5}));
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
jsTest.log('The collection drop generates a drop event');
{
coll.drop();
@@ -234,6 +315,92 @@ jsTest.log('The collection drop generates a drop event');
assert.eq(changeStream.next().operationType, 'drop');
}
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// Batched deletes from router and to shard
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+// Set the database to use batched deletes.
+const db2 = st.rs0.getPrimary().getDB(dbName);
+assert.commandWorked(db2.adminCommand({setParameter: 1, batchedDeletesTargetStagedDocBytes: 0}));
+assert.commandWorked(db2.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
+assert.commandWorked(db2.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: 2}));
+
+// Create a non-sharded collection.
+const coll2 = db2.getCollection(collName);
+
+// Setup a change stream on the collection to receive real-time events on any data changes.
+const changeStream2 = coll2.watch([]);
+
+jsTest.log('A batched delete from router generates only one delete event');
+{
+ // Insert two documents in the collection (see 'batchedDeletesTargetBatchDocs') and skip the
+ // generated events.
+ assert.commandWorked(coll2.insert({_id: 0, name: 'volkswagen'}));
+ assert.commandWorked(coll2.insert({_id: 1, name: 'renault'}));
+ assert.soon(() => changeStream2.hasNext(), 'A first insert event is expected');
+ assert.eq(changeStream2.next().operationType, 'insert');
+ assert.soon(() => changeStream2.hasNext(), 'A second insert event is expected');
+ assert.eq(changeStream2.next().operationType, 'insert');
+ assert(!changeStream2.hasNext());
+
+ // Delete all documents in batch from the collection.
+ assert.commandWorked(coll2.deleteMany({_id: {$gte: 0}}));
+
+ // Actually only one delete operation is performed. Consequently, only one delete event is
+ // notified.
+ // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is
+ // completed.
+ assert.soon(() => changeStream2.hasNext(), 'A first delete event is expected');
+ assert.eq(changeStream2.next().operationType, 'delete');
+ assert.soon(() => changeStream2.hasNext(), 'A second delete event is expected for now');
+ assert.eq(changeStream2.next().operationType, 'delete');
+ assert(!changeStream2.hasNext());
+
+ // All documents have been removed from the collection.
+ assert.eq(0, coll2.find().itcount());
+}
+
+jsTest.log('A batched delete to shard generates only one delete event');
+{
+ // Insert two documents in the collection (see 'batchedDeletesTargetBatchDocs') and skip the
+ // generated events.
+ assert.commandWorked(coll2.insert({_id: 0, name: 'volkswagen'}));
+ assert.commandWorked(coll2.insert({_id: 1, name: 'renault'}));
+ assert.soon(() => changeStream2.hasNext(), 'A first insert event is expected');
+ assert.eq(changeStream2.next().operationType, 'insert');
+ assert.soon(() => changeStream2.hasNext(), 'A second insert event is expected');
+ assert.eq(changeStream2.next().operationType, 'insert');
+ assert(!changeStream2.hasNext());
+
+ // Delete all documents in batch from the collection.
+ assert.commandWorked(st.shard0.getCollection(collNS).deleteMany({_id: {$gte: 0}}));
+
+ // Actually only one delete operation is performed. Consequently, only one delete event is
+ // notified.
+ // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is
+ // completed.
+ assert.soon(() => changeStream2.hasNext(), 'A first delete event is expected');
+ assert.eq(changeStream2.next().operationType, 'delete');
+ assert.soon(() => changeStream2.hasNext(), 'A second delete event is expected for now');
+ assert.eq(changeStream2.next().operationType, 'delete');
+ assert(!changeStream2.hasNext());
+
+ // All documents have been removed from the collection.
+ assert.eq(0, coll2.find().itcount());
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+jsTest.log('The collection drop generates a drop event');
+{
+ coll2.drop();
+
+ // Essentially, this verifies that the operation before dropping the collection did not notify
+ // additional and unexpected events.
+ assert.soon(() => changeStream2.hasNext(), 'A drop event is expected');
+ assert.eq(changeStream2.next().operationType, 'drop');
+}
+
suspendRangeDeletionShard0.off();
st.stop();
diff --git a/jstests/sharding/internal_txns/retry_on_transient_error_basic.js b/jstests/sharding/internal_txns/retry_on_transient_error_basic.js
index b417d5a545b..2bc6e61425f 100644
--- a/jstests/sharding/internal_txns/retry_on_transient_error_basic.js
+++ b/jstests/sharding/internal_txns/retry_on_transient_error_basic.js
@@ -145,6 +145,10 @@ function testPersistence(shardRst, lsid, txnNumber, txnDocFilter, oplogEntryFilt
const txnRetryCounter1 = NumberInt(1);
let db = shardRst.getPrimary().getDB(kDbName);
+ // Preload the collection metadata to avoid repeating the insert command if it fails due to
+ // StaleConfig error.
+ assert.commandWorked(db.adminCommand({_flushRoutingTableCacheUpdates: kNs}));
+
const insertCmdObj = {
insert: kCollName,
documents: [{x: 0}],
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 8e3c1c93b7d..6b7bd4ef6a8 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/keys_collection_document_gen.h"
@@ -65,6 +66,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -513,6 +515,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
if (inMultiDocumentTransaction) {
+ invariant(!fromMigrate);
+
// Do not add writes to the profile collection to the list of transaction operations, since
// these are done outside the transaction. There is no top-level WriteUnitOfWork when we are
// in a SideTransactionBlock.
@@ -523,6 +527,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
const bool inRetryableInternalTransaction =
isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+ write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss);
for (auto iter = first; iter != last; iter++) {
const auto docKey = repl::getDocumentKey(opCtx, nss, iter->doc).getShardKeyAndId();
@@ -532,6 +537,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
}
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
+
+ if (!OperationShardingState::isComingFromRouter(opCtx) &&
+ preWriteFilter.computeAction(Document(iter->doc)) ==
+ write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) {
+ LOGV2_DEBUG(6585801,
+ 3,
+ "Marking insert operation of orphan document with the 'fromMigrate' "
+ "flag to prevent a wrong change stream event",
+ "namespace"_attr = nss,
+ "document"_attr = iter->doc);
+
+ operation.setFromMigrate(true);
+ }
+
txnParticipant.addTransactionOperation(opCtx, operation);
}
} else {
@@ -697,7 +716,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
-
+ operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
@@ -839,6 +858,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
if (inBatchedWrite) {
auto operation =
MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
+ operation.setFromMigrateIfTrue(args.fromMigrate);
batchedWriteContext.addBatchedOperation(opCtx, operation);
} else if (inMultiDocumentTransaction) {
const bool inRetryableInternalTransaction =
@@ -897,6 +917,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
}
operation.setDestinedRecipient(destinedRecipientDecoration(opCtx));
+ operation.setFromMigrateIfTrue(args.fromMigrate);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0ea5095efee..b684de939ec 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -551,7 +551,7 @@ std::vector<OpTime> logInsertOps(
"namespace"_attr = nss,
"document"_attr = begin[i].doc);
- oplogEntry.setFromMigrateIfTrue(true);
+ oplogEntry.setFromMigrate(true);
}
appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtIds);
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 87a946fe634..2eb195c079a 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -189,6 +189,11 @@ public:
return variant_util::toVector<StmtId>(DurableReplOperation::getStatementIds());
}
+ void setFromMigrateIfTrue(bool value) & {
+ if (value)
+ setFromMigrate(value);
+ }
+
private:
BSONObj _preImageDocumentKey;
@@ -339,6 +344,10 @@ public:
*/
OpTime getOpTime() const;
+ void setFromMigrate(bool value) & {
+ getDurableReplOperation().setFromMigrate(value);
+ }
+
/**
* Same as setFromMigrate but only set when it is true.
*/
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 630cb67aef9..7c1ba09f320 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -127,6 +127,10 @@ structs:
optional: true
description: "Identifier of the transaction statement(s) which generated this oplog
entry"
+ fromMigrate:
+ type: bool
+ optional: true
+ description: "An operation caused by a chunk migration"
OplogEntryBase:
description: A document in which the server stores an oplog entry.
@@ -150,10 +154,6 @@ structs:
cpp_name: wallClockTime
type: date
description: "A wallclock time with MS resolution"
- fromMigrate:
- type: bool
- optional: true
- description: "An operation caused by a chunk migration"
fromTenantMigration:
type: uuid
optional: true