From 8c9ceb09f28bdecbfc896994f45f9a69f1011eca Mon Sep 17 00:00:00 2001 From: Antonio Fuschetto Date: Fri, 6 May 2022 16:51:14 +0000 Subject: SERVER-65858 Mark the applyOps entries when the operations involves orphaned documents --- jstests/sharding/change_stream_no_orphans.js | 271 +++++++++++++++++---- .../retry_on_transient_error_basic.js | 4 + src/mongo/db/op_observer_impl.cpp | 23 +- src/mongo/db/repl/oplog.cpp | 2 +- src/mongo/db/repl/oplog_entry.h | 9 + src/mongo/db/repl/oplog_entry.idl | 8 +- 6 files changed, 259 insertions(+), 58 deletions(-) diff --git a/jstests/sharding/change_stream_no_orphans.js b/jstests/sharding/change_stream_no_orphans.js index e835d75a72f..98d5fd1bcd6 100644 --- a/jstests/sharding/change_stream_no_orphans.js +++ b/jstests/sharding/change_stream_no_orphans.js @@ -3,10 +3,11 @@ * streams and (2) have no effect on the persisted data. * * The behavior is tested in the following scenarios: - * - Direct update/delete (from shard) to orphaned document - * - Distributed update/delete (from router) on a single document - * - Distributed update/delete (from router) to multi-documents - * - Distributed update/delete (from router) via transaction on multi-documents + * - Direct operations to shard on orphaned documents + * - Broadcasted operations (from router) on orphaned documents + * - Transaction from router updating both orphaned and non-orphaned documents + * - Transaction to shard updating both orphaned and non-orphaned documents + * - Batched deletes from router and to shard * * @tags: [ * requires_fcv_53, @@ -41,12 +42,14 @@ assert.commandWorked( st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName})); assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}})); const coll = st.s.getCollection(collNS); -assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 20})); -assert.commandWorked(coll.insert({_id: 1, name: 'john', age: 25})); -assert.commandWorked(coll.insert({_id: 2, name: 'robert', age: 30})); -assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 35})); -assert.commandWorked(coll.insert({_id: 4, name: 'james', age: 40})); -assert.commandWorked(coll.insert({_id: 5, name: 'james', age: 45})); +assert.commandWorked(coll.insert({_id: -2, name: 'emma', age: 20})); +assert.commandWorked(coll.insert({_id: -1, name: 'olivia', age: 25})); +assert.commandWorked(coll.insert({_id: 0, name: 'matt', age: 30})); +assert.commandWorked(coll.insert({_id: 1, name: 'john', age: 35})); +assert.commandWorked(coll.insert({_id: 2, name: 'robert', age: 40})); +assert.commandWorked(coll.insert({_id: 3, name: 'robert', age: 45})); +assert.commandWorked(coll.insert({_id: 4, name: 'james', age: 50})); +assert.commandWorked(coll.insert({_id: 5, name: 'liam', age: 55})); // Move the chunk to the second shard leaving orphaned documents on the first shard. assert.commandWorked(st.s.adminCommand({split: collNS, middle: {_id: 0}})); @@ -56,10 +59,14 @@ assert.commandWorked( // Setup a change stream on the collection to receive real-time events on any data changes. const changeStream = coll.watch([]); +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Direct operations to shard on orphaned documents +//////////////////////////////////////////////////////////////////////////////////////////////////// + jsTest.log('A direct insert to a shard of an orphaned document does not generate an insert event'); { // Direct insert to first shard of an orphaned document. - assert.commandWorked(st.shard0.getCollection(collNS).insert({_id: 6, name: 'ken', age: 50})); + assert.commandWorked(st.shard0.getCollection(collNS).insert({_id: 6, name: 'ken', age: 60})); // No event is notified. assert(!changeStream.hasNext()); @@ -68,19 +75,19 @@ jsTest.log('A direct insert to a shard of an orphaned document does not generate assert.neq(null, st.shard0.getCollection(collNS).findOne({_id: 6})); } -jsTest.log('A direct update to a shard on an orphaned document generates an update event'); +jsTest.log('A direct update to a shard of an orphaned document does not generate an update event'); { // Send a direct update to first shard on an orphaned document. - assert.commandWorked(st.shard0.getCollection(collNS).update({name: 'matt'}, {$set: {age: 21}})); + assert.commandWorked(st.shard0.getCollection(collNS).update({name: 'matt'}, {$set: {age: 31}})); // No change stream event is generated. assert(!changeStream.hasNext()); // The orphaned document on first shard has been updated. - assert.eq(21, st.shard0.getCollection(collNS).findOne({_id: 0}).age); + assert.eq(31, st.shard0.getCollection(collNS).findOne({_id: 0}).age); } -jsTest.log('A direct delete to a shard on an orphaned document generates an update event'); +jsTest.log('A direct delete to a shard of an orphaned document does generate an update event'); { // Send a direct delete to first shard on an orphaned document. assert.commandWorked(st.shard0.getCollection(collNS).remove({name: 'matt'})); @@ -92,10 +99,14 @@ jsTest.log('A direct delete to a shard on an orphaned document generates an upda assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: 0})); } -jsTest.log('A distributed update on a single document generates an update event'); +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Broadcasted operations (from router) on orphaned documents +//////////////////////////////////////////////////////////////////////////////////////////////////// + +jsTest.log('A broadcasted update of a single document generates an update event'); { - // Send a distributed update (query on non-key field) on a single document to all the shards. - assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 26}}, {multi: true})); + // Send a broadcasted update (query on non-key field) on a single document to all the shards. + assert.commandWorked(coll.update({name: 'john'}, {$set: {age: 36}}, {multi: true})); // The document is hosted by the second shard and the update event is notified. The first shard // still hosts the orphaned document so no additional event must be notified. @@ -105,13 +116,13 @@ jsTest.log('A distributed update on a single document generates an update event' // The orphaned document on first shard has not been updated, unlike the non-orphaned one on the // second shard. - assert.eq(25, st.shard0.getCollection(collNS).findOne({_id: 1}).age); - assert.eq(26, st.shard1.getCollection(collNS).findOne({_id: 1}).age); + assert.eq(35, st.shard0.getCollection(collNS).findOne({_id: 1}).age); + assert.eq(36, st.shard1.getCollection(collNS).findOne({_id: 1}).age); } -jsTest.log('A distributed delete on a single document generates a delete event'); +jsTest.log('A broadcasted delete of a single document generates a delete event'); { - // Send a distributed delete (query on non-key field) on a single document to all the shards. + // Send a broadcasted delete (query on non-key field) on a single document to all the shards. assert.commandWorked(coll.remove({name: 'john'})); // The document is hosted by the second shard and the delete event is notified. The first shard @@ -126,10 +137,10 @@ jsTest.log('A distributed delete on a single document generates a delete event') assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 1})); } -jsTest.log('A distributed update on multi-documents generates more update events'); +jsTest.log('A broadcasted update of multi-documents generates more update events'); { - // Send a distributed update (query on non-key field) on two documents to all the shards. - assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 36}}, {multi: true})); + // Send a broadcasted update (query on non-key field) on two documents to all the shards. + assert.commandWorked(coll.update({name: 'robert'}, {$set: {age: 41}}, {multi: true})); // The documents are hosted by the second shard and two delete events are notified. The first // shard still hosts the orphaned documents so no additional event must be notified. @@ -141,15 +152,15 @@ jsTest.log('A distributed update on multi-documents generates more update events // The orphaned documents on first shard have not been updated, unlike the non-orphaned ones on // the second shard. - assert.eq(30, st.shard0.getCollection(collNS).findOne({_id: 2}).age); - assert.eq(35, st.shard0.getCollection(collNS).findOne({_id: 3}).age); - assert.eq(36, st.shard1.getCollection(collNS).findOne({_id: 2}).age); - assert.eq(36, st.shard1.getCollection(collNS).findOne({_id: 3}).age); + assert.eq(40, st.shard0.getCollection(collNS).findOne({_id: 2}).age); + assert.eq(45, st.shard0.getCollection(collNS).findOne({_id: 3}).age); + assert.eq(41, st.shard1.getCollection(collNS).findOne({_id: 2}).age); + assert.eq(41, st.shard1.getCollection(collNS).findOne({_id: 3}).age); } -jsTest.log('A distributed delete on multi-documents generates more delete events'); +jsTest.log('A broadcasted delete of multi-documents generates more delete events'); { - // Send a distributed delete (query on non-key field) on two documents to all the shards. + // Send a broadcasted delete (query on non-key field) on two documents to all the shards. assert.commandWorked(coll.remove({name: 'robert'})); // The documents are hosted by the second shard and two delete events are notified. The first @@ -168,62 +179,132 @@ jsTest.log('A distributed delete on multi-documents generates more delete events assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 3})); } -jsTest.log('A distributed update via transaction on multi-documents generates more update events'); +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Transaction from router updating both orphaned and non-orphaned documents +//////////////////////////////////////////////////////////////////////////////////////////////////// + +jsTest.log('Broadcasted updates (via a transaction through the router) of both orphaned and ' + + 'non-orphaned documents generate events only for operations on non-orphaned documents'); { - // Send a distributed update (query on non-key field) via transaction on two documents to all - // the shards. + // Send a broadcasted transaction to the router updating both orphaned and non-orphaned + // documents. const session = st.s.startSession(); const sessionDB = session.getDatabase(dbName); const sessionColl = sessionDB.getCollection(collName); session.startTransaction(); - assert.commandWorked(sessionColl.update({name: 'james'}, {$set: {age: 46}}, {multi: true})); + assert.commandWorked(sessionColl.update({name: 'olivia'}, {$set: {age: 26}}, {multi: true})); + assert.commandWorked(sessionColl.update({name: 'james'}, {$set: {age: 51}}, {multi: true})); assert.commandWorked(session.commitTransaction_forTesting()); session.endSession(); - // The documents are hosted by the second shard and two delete events are notified. The first - // shard still hosts the orphaned documents so no additional event must be notified. + // The primary shard hosts orphaned (james) and non-orphaned (olivia) documents, whereas the + // second shard hosts a non-orphaned document (james). Consequently, two update events are + // notified. assert.soon(() => changeStream.hasNext(), 'A first update event is expected'); assert.eq(changeStream.next().operationType, 'update'); assert.soon(() => changeStream.hasNext(), 'A second update event is expected'); assert.eq(changeStream.next().operationType, 'update'); assert(!changeStream.hasNext()); - // The orphaned documents on first shard have not been updated, unlike the non-orphaned ones on - // the second shard. - assert.eq(40, st.shard0.getCollection(collNS).findOne({_id: 4}).age); - assert.eq(45, st.shard0.getCollection(collNS).findOne({_id: 5}).age); - assert.eq(46, st.shard1.getCollection(collNS).findOne({_id: 4}).age); - assert.eq(46, st.shard1.getCollection(collNS).findOne({_id: 5}).age); + // The orphaned document on first shard (james) has not been updated, unlike the non-orphaned + // ones on both primary and second shards (olivia and james). + assert.eq(26, st.shard0.getCollection(collNS).findOne({_id: -1}).age); + assert.eq(50, st.shard0.getCollection(collNS).findOne({_id: 4}).age); + assert.eq(51, st.shard1.getCollection(collNS).findOne({_id: 4}).age); } -jsTest.log('A distributed delete via transaction on multi-documents generates more delete events'); +jsTest.log('Broadcasted deletes (via a transaction through the router) of both orphaned and ' + + 'non-orphaned documents generate events only for operations on non-orphaned documents'); { - // Send a distributed delete (query on non-key field) via transaction on two documents to all - // the shards. + // Send a broadcasted transaction to the router deleting both orphaned and non-orphaned + // documents. const session = st.s.startSession(); const sessionDB = session.getDatabase(dbName); const sessionColl = sessionDB.getCollection(collName); session.startTransaction(); + assert.commandWorked(sessionColl.remove({name: 'olivia'})); assert.commandWorked(sessionColl.remove({name: 'james'})); assert.commandWorked(session.commitTransaction_forTesting()); session.endSession(); - // The documents are hosted by the second shard and two delete events are notified. The first - // shard still hosts the orphaned documents so no additional event must be notified. + // The primary shard hosts orphaned (james) and non-orphaned (olivia) documents, whereas the + // second shard hosts a non-orphaned document (james). Consequently, two delete events are + // notified. assert.soon(() => changeStream.hasNext(), 'A first delete event is expected'); assert.eq(changeStream.next().operationType, 'delete'); assert.soon(() => changeStream.hasNext(), 'A second delete event is expected'); assert.eq(changeStream.next().operationType, 'delete'); assert(!changeStream.hasNext()); - // The orphaned documents on first shard have not been removed, unlike the non-orphaned ones on - // the second shard. + // The orphaned document on first shard (james) has not been removed, unlike the non-orphaned + // ones on both primary and second shards (olivia and james). + assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: -1})); assert.neq(null, st.shard0.getCollection(collNS).findOne({_id: 4})); - assert.neq(null, st.shard0.getCollection(collNS).findOne({_id: 5})); assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 4})); - assert.eq(null, st.shard1.getCollection(collNS).findOne({_id: 5})); } +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Transaction to shard updating both orphaned and non-orphaned documents +//////////////////////////////////////////////////////////////////////////////////////////////////// + +jsTest.log('Direct updates (via a transaction to a shard) of both orphaned and non-orphaned' + + 'documents generate events only for operations on non-orphaned documents'); +{ + // Send a direct transaction to a shard updating both orphaned and non-orphaned documents. + const session = st.rs0.getPrimary().getDB(dbName).getMongo().startSession(); + const sessionDB = session.getDatabase(dbName); + const sessionColl = sessionDB.getCollection(collName); + session.startTransaction(); + assert.commandWorked(sessionColl.update({name: 'emma'}, {$set: {age: 21}}, {multi: true})); + assert.commandWorked(sessionColl.update({name: 'liam'}, {$set: {age: 56}}, {multi: true})); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); + + // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only + // one update event is notified. + // TODO (SERVER-65859): The second update event will be filtered out when the ticket is + // completed. + assert.soon(() => changeStream.hasNext(), 'A first update event is expected'); + assert.eq(changeStream.next().operationType, 'update'); + assert.soon(() => changeStream.hasNext(), 'A second update event is expected'); + assert.eq(changeStream.next().operationType, 'update'); + assert(!changeStream.hasNext()); + + // Both orphaned (liam) and non-orphaned (emma) documents on the shard have been updated. + assert.eq(21, st.shard0.getCollection(collNS).findOne({_id: -2}).age); + assert.eq(56, st.shard0.getCollection(collNS).findOne({_id: 5}).age); +} + +jsTest.log('Direct deletes (via a transaction to a shard) of both orphaned and non-orphaned' + + 'documents generate events only for operations on non-orphaned documents'); +{ + // Send a direct transaction to a shard deleting both orphaned and non-orphaned documents. + const session = st.rs0.getPrimary().getDB(dbName).getMongo().startSession(); + const sessionDB = session.getDatabase(dbName); + const sessionColl = sessionDB.getCollection(collName); + session.startTransaction(); + assert.commandWorked(sessionColl.remove({name: 'emma'})); + assert.commandWorked(sessionColl.remove({name: 'liam'})); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); + + // The shard hosts both orphaned (liam) and non-orphaned (emma) documents. Consequently, only + // one update event is notified. + // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is + // completed. + assert.soon(() => changeStream.hasNext(), 'A first delete event is expected'); + assert.eq(changeStream.next().operationType, 'delete'); + assert.soon(() => changeStream.hasNext(), 'A second delete event is expected'); + assert.eq(changeStream.next().operationType, 'delete'); + assert(!changeStream.hasNext()); + + // Both orphaned (liam) and non-orphaned (emma) documents on the shard have been removed. + assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: -2})); + assert.eq(null, st.shard0.getCollection(collNS).findOne({_id: 5})); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + jsTest.log('The collection drop generates a drop event'); { coll.drop(); @@ -234,6 +315,92 @@ jsTest.log('The collection drop generates a drop event'); assert.eq(changeStream.next().operationType, 'drop'); } +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Batched deletes from router and to shard +//////////////////////////////////////////////////////////////////////////////////////////////////// + +// Set the database to use batched deletes. +const db2 = st.rs0.getPrimary().getDB(dbName); +assert.commandWorked(db2.adminCommand({setParameter: 1, batchedDeletesTargetStagedDocBytes: 0})); +assert.commandWorked(db2.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0})); +assert.commandWorked(db2.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: 2})); + +// Create a non-sharded collection. +const coll2 = db2.getCollection(collName); + +// Setup a change stream on the collection to receive real-time events on any data changes. +const changeStream2 = coll2.watch([]); + +jsTest.log('A batched delete from router generates only one delete event'); +{ + // Insert two documents in the collection (see 'batchedDeletesTargetBatchDocs') and skip the + // generated events. + assert.commandWorked(coll2.insert({_id: 0, name: 'volkswagen'})); + assert.commandWorked(coll2.insert({_id: 1, name: 'renault'})); + assert.soon(() => changeStream2.hasNext(), 'A first insert event is expected'); + assert.eq(changeStream2.next().operationType, 'insert'); + assert.soon(() => changeStream2.hasNext(), 'A second insert event is expected'); + assert.eq(changeStream2.next().operationType, 'insert'); + assert(!changeStream2.hasNext()); + + // Delete all documents in batch from the collection. + assert.commandWorked(coll2.deleteMany({_id: {$gte: 0}})); + + // Actually only one delete operation is performed. Consequently, only one delete event is + // notified. + // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is + // completed. + assert.soon(() => changeStream2.hasNext(), 'A first delete event is expected'); + assert.eq(changeStream2.next().operationType, 'delete'); + assert.soon(() => changeStream2.hasNext(), 'A second delete event is expected for now'); + assert.eq(changeStream2.next().operationType, 'delete'); + assert(!changeStream2.hasNext()); + + // All documents have been removed from the collection. + assert.eq(0, coll2.find().itcount()); +} + +jsTest.log('A batched delete to shard generates only one delete event'); +{ + // Insert two documents in the collection (see 'batchedDeletesTargetBatchDocs') and skip the + // generated events. + assert.commandWorked(coll2.insert({_id: 0, name: 'volkswagen'})); + assert.commandWorked(coll2.insert({_id: 1, name: 'renault'})); + assert.soon(() => changeStream2.hasNext(), 'A first insert event is expected'); + assert.eq(changeStream2.next().operationType, 'insert'); + assert.soon(() => changeStream2.hasNext(), 'A second insert event is expected'); + assert.eq(changeStream2.next().operationType, 'insert'); + assert(!changeStream2.hasNext()); + + // Delete all documents in batch from the collection. + assert.commandWorked(st.shard0.getCollection(collNS).deleteMany({_id: {$gte: 0}})); + + // Actually only one delete operation is performed. Consequently, only one delete event is + // notified. + // TODO (SERVER-65859): The second delete event will be filtered out when the ticket is + // completed. + assert.soon(() => changeStream2.hasNext(), 'A first delete event is expected'); + assert.eq(changeStream2.next().operationType, 'delete'); + assert.soon(() => changeStream2.hasNext(), 'A second delete event is expected for now'); + assert.eq(changeStream2.next().operationType, 'delete'); + assert(!changeStream2.hasNext()); + + // All documents have been removed from the collection. + assert.eq(0, coll2.find().itcount()); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +jsTest.log('The collection drop generates a drop event'); +{ + coll2.drop(); + + // Essentially, this verifies that the operation before dropping the collection did not notify + // additional and unexpected events. + assert.soon(() => changeStream2.hasNext(), 'A drop event is expected'); + assert.eq(changeStream2.next().operationType, 'drop'); +} + suspendRangeDeletionShard0.off(); st.stop(); diff --git a/jstests/sharding/internal_txns/retry_on_transient_error_basic.js b/jstests/sharding/internal_txns/retry_on_transient_error_basic.js index b417d5a545b..2bc6e61425f 100644 --- a/jstests/sharding/internal_txns/retry_on_transient_error_basic.js +++ b/jstests/sharding/internal_txns/retry_on_transient_error_basic.js @@ -145,6 +145,10 @@ function testPersistence(shardRst, lsid, txnNumber, txnDocFilter, oplogEntryFilt const txnRetryCounter1 = NumberInt(1); let db = shardRst.getPrimary().getDB(kDbName); + // Preload the collection metadata to avoid repeating the insert command if it fails due to + // StaleConfig error. + assert.commandWorked(db.adminCommand({_flushRoutingTableCacheUpdates: kNs})); + const insertCmdObj = { insert: kCollName, documents: [{x: 0}], diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 8e3c1c93b7d..6b7bd4ef6a8 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -48,6 +48,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/write_stage_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/keys_collection_document_gen.h" @@ -65,6 +66,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog_mongod.h" @@ -513,6 +515,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); if (inMultiDocumentTransaction) { + invariant(!fromMigrate); + // Do not add writes to the profile collection to the list of transaction operations, since // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are // in a SideTransactionBlock. @@ -523,6 +527,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, const bool inRetryableInternalTransaction = isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); + write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss); for (auto iter = first; iter != last; iter++) { const auto docKey = repl::getDocumentKey(opCtx, nss, iter->doc).getShardKeyAndId(); @@ -532,6 +537,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(iter->doc)); + + if (!OperationShardingState::isComingFromRouter(opCtx) && + preWriteFilter.computeAction(Document(iter->doc)) == + write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) { + LOGV2_DEBUG(6585801, + 3, + "Marking insert operation of orphan document with the 'fromMigrate' " + "flag to prevent a wrong change stream event", + "namespace"_attr = nss, + "document"_attr = iter->doc); + + operation.setFromMigrate(true); + } + txnParticipant.addTransactionOperation(opCtx, operation); } } else { @@ -697,7 +716,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); - + operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate); txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; @@ -839,6 +858,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, if (inBatchedWrite) { auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId()); + operation.setFromMigrateIfTrue(args.fromMigrate); batchedWriteContext.addBatchedOperation(opCtx, operation); } else if (inMultiDocumentTransaction) { const bool inRetryableInternalTransaction = @@ -897,6 +917,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } operation.setDestinedRecipient(destinedRecipientDecoration(opCtx)); + operation.setFromMigrateIfTrue(args.fromMigrate); txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0ea5095efee..b684de939ec 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -551,7 +551,7 @@ std::vector logInsertOps( "namespace"_attr = nss, "document"_attr = begin[i].doc); - oplogEntry.setFromMigrateIfTrue(true); + oplogEntry.setFromMigrate(true); } appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtIds); diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 87a946fe634..2eb195c079a 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -189,6 +189,11 @@ public: return variant_util::toVector(DurableReplOperation::getStatementIds()); } + void setFromMigrateIfTrue(bool value) & { + if (value) + setFromMigrate(value); + } + private: BSONObj _preImageDocumentKey; @@ -339,6 +344,10 @@ public: */ OpTime getOpTime() const; + void setFromMigrate(bool value) & { + getDurableReplOperation().setFromMigrate(value); + } + /** * Same as setFromMigrate but only set when it is true. */ diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 630cb67aef9..7c1ba09f320 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -127,6 +127,10 @@ structs: optional: true description: "Identifier of the transaction statement(s) which generated this oplog entry" + fromMigrate: + type: bool + optional: true + description: "An operation caused by a chunk migration" OplogEntryBase: description: A document in which the server stores an oplog entry. @@ -150,10 +154,6 @@ structs: cpp_name: wallClockTime type: date description: "A wallclock time with MS resolution" - fromMigrate: - type: bool - optional: true - description: "An operation caused by a chunk migration" fromTenantMigration: type: uuid optional: true -- cgit v1.2.1