diff options
author | Mindaugas Malinauskas <mindaugas.malinauskas@mongodb.com> | 2021-12-09 17:36:50 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-07 23:04:09 +0000 |
commit | 54c977ae2b278136a87f4dd46e81bed3d5224d8e (patch) | |
tree | 48cb83751603101af1473adcc665a050d931da11 | |
parent | 1e6eb502e1754cfda0b39bf13605b73471641c70 (diff) | |
download | mongo-54c977ae2b278136a87f4dd46e81bed3d5224d8e.tar.gz |
SERVER-58694 Implement writing of pre-images for transactional update/replace/delete operations
33 files changed, 923 insertions, 67 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams.yml b/buildscripts/resmokeconfig/suites/change_streams.yml index 1493534fbb5..1b8715b5732 100644 --- a/buildscripts/resmokeconfig/suites/change_streams.yml +++ b/buildscripts/resmokeconfig/suites/change_streams.yml @@ -52,4 +52,4 @@ executor: bind_ip_all: '' set_parameters: enableTestCommands: 1 - num_nodes: 1 + num_nodes: 2 diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index 30293e405cd..409a6ef0dbf 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -14,8 +14,6 @@ selector: - assumes_write_concern_unchanged # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos - exclude_files: - - jstests/change_streams/change_stream_lookup_preimage_with_resharding.js executor: archive: diff --git a/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml b/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml index 516c2107e37..8ce0bbda59c 100644 --- a/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml @@ -6,6 +6,8 @@ selector: exclude_files: # Linearizable read concern is not supported for transactions. - jstests/core/txns/**/*.js + - jstests/core/write_change_stream_pit_preimage_in_transaction.js + # These tests use benchRun(), which isn't configured to use the overridden writeConcern. - jstests/core/bench_test*.js - jstests/core/benchrun_pipeline_updates.js # benchRun() used for writes diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml index 1e7dfcc243f..1e2cc5ef128 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml @@ -34,6 +34,7 @@ selector: - jstests/core/top.js # Change stream pre-images are not cloned during initial sync. - jstests/core/write_change_stream_pit_preimage.js + - jstests/core/write_change_stream_pit_preimage_in_transaction.js exclude_with_any_tags: - assumes_standalone_mongod diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml index 97d6ae931e1..5ad77fa692c 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml @@ -8,6 +8,7 @@ selector: - jstests/core/views/duplicate_ns.js # Change stream pre-images are not cloned during initial sync. - jstests/core/write_change_stream_pit_preimage.js + - jstests/core/write_change_stream_pit_preimage_in_transaction.js exclude_with_any_tags: - assumes_standalone_mongod diff --git a/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml index a12f9273ee6..abb23b22db0 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml @@ -11,6 +11,7 @@ selector: # Transactions do not support retryability of individual operations. # TODO: Remove this once it is supported (SERVER-33952). - jstests/core/txns/**/*.js + - jstests/core/write_change_stream_pit_preimage_in_transaction.js # The set_param1.js test attempts to compare the response from running the {getParameter: "*"} # command multiple times, which may observe the change to the "transactionLifetimeLimitSeconds" # server parameter. diff --git a/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml index 5848ee5bb64..ca566fe1d06 100644 --- a/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml @@ -6,6 +6,7 @@ selector: exclude_files: # Operations within a transaction do not support write concern. - jstests/core/txns/**/*.js + - jstests/core/write_change_stream_pit_preimage_in_transaction.js # Parallel shell is not causally consistent - jstests/core/benchrun_pipeline_updates.js diff --git a/jstests/auth/change_stream_pre_image_coll_role_auth.js b/jstests/auth/change_stream_pre_image_coll_role_auth.js index d13d0f85daa..688bd0f3e70 100644 --- a/jstests/auth/change_stream_pre_image_coll_role_auth.js +++ b/jstests/auth/change_stream_pre_image_coll_role_auth.js @@ -6,8 +6,6 @@ * requires_fcv_53, * featureFlagChangeStreamPreAndPostImages, * uses_change_streams, - * # TODO SERVER-58694: remove this tag. - * change_stream_does_not_expect_txns, * assumes_read_preference_unchanged, * requires_replication, * ] diff --git a/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js index 645c8fef807..4e8feacada3 100644 --- a/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js +++ b/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js @@ -6,8 +6,6 @@ * * @tags: [ * uses_change_streams, - * # TODO SERVER-58694: remove this tag. - * change_stream_does_not_expect_txns, * ] */ (function() { diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js index 963926c03bc..30f8a4658a7 100644 --- a/jstests/change_streams/lookup_pit_pre_and_post_image.js +++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js @@ -2,8 +2,6 @@ // with different arguments for collections with 'changeStreamPreAndPostImages' being enabled. // @tags: [ // requires_fcv_52, -// # TODO SERVER-58694: remove this tag. -// change_stream_does_not_expect_txns, // ] (function() { "use strict"; diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js b/jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js new file mode 100644 index 00000000000..560963f3f8c --- /dev/null +++ b/jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js @@ -0,0 +1,151 @@ +/** + * Tests that point-in-time pre- and post-images are retrieved for update/replace/delete operations + * performed in a transaction and non-atomic "applyOps" command. + * @tags: [ + * requires_fcv_53, + * featureFlagChangeStreamPreAndPostImages, + * uses_transactions, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled and + // ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos. +load("jstests/libs/transactions_util.js"); // For TransactionsUtil.runInTransaction. + +const testDB = db.getSiblingDB(jsTestName()); +const cst = new ChangeStreamTest(testDB); +const coll = assertDropAndRecreateCollection( + testDB, "coll", {changeStreamPreAndPostImages: {enabled: true}}); +const collOther = assertDropAndRecreateCollection(testDB, "coll_regular"); + +// Verifies that change stream cursor 'changeStreamCursor' returns events defined in array +// 'expectedEvents' in any order. +function assertChangeEventsReturned(changeStreamCursor, expectedEvents) { + function toChangeEvent(event) { + const {_id, operationType, preImage, postImage} = event; + let result = { + documentKey: {_id}, + ns: {db: testDB.getName(), coll: coll.getName()}, + operationType, + }; + if (preImage != undefined) { + result.fullDocumentBeforeChange = preImage; + } + if (postImage != undefined) { + result.fullDocument = postImage; + } + return result; + } + cst.assertNextChangesEqualUnordered( + {cursor: changeStreamCursor, expectedChanges: expectedEvents.map(toChangeEvent)}); +} + +assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 2, a: 1}, {_id: 3, a: 1}])); + +// Open a change stream on the test collection with pre- and post-images requested. +const changeStreamCursor = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'whenAvailable'}} + ], + collection: coll +}); + +// Gets collections used in the test for database 'db'. In some passthroughs the collections get +// sharded on 'getCollection()' invocation and it must happen when a transaction is not active. +function getCollections(db) { + return {coll: db[coll.getName()], otherColl: db[collOther.getName()]}; +} + +jsTestLog("Testing a transaction consisting of a single 'applyOps' entry."); +TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) { + assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}})); + assert.commandWorked(coll.replaceOne({_id: 2}, {a: "Long string"})); + assert.commandWorked(coll.deleteOne({_id: 3})); +}); +assertChangeEventsReturned(changeStreamCursor, [ + {_id: 1, operationType: "update", preImage: {_id: 1, a: 1}, postImage: {_id: 1, a: 2}}, + { + _id: 2, + operationType: "replace", + preImage: {_id: 2, a: 1}, + postImage: {_id: 2, a: "Long string"} + }, + {_id: 3, operationType: "delete", preImage: {_id: 3, a: 1}}, +]); + +jsTestLog("Testing a transaction consisting of multiple 'applyOps' entries."); +const largeStringSizeInBytes = 15 * 1024 * 1024; +const largeString = "b".repeat(largeStringSizeInBytes); +assert.commandWorked(coll.insert([{_id: 3, a: 1}])); +TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) { + assert.commandWorked(otherColl.insert({b: largeString})); + assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}})); + + assert.commandWorked(otherColl.insert({b: largeString})); + assert.commandWorked(coll.replaceOne({_id: 2}, {a: 1})); + + // Issue a second modification operation on the same document within the transaction. + assert.commandWorked(coll.updateOne({_id: 2}, {$inc: {a: 1}})); + + assert.commandWorked(coll.deleteOne({_id: 3})); + assert.commandWorked(otherColl.insert({b: largeString})); +}); +assertChangeEventsReturned(changeStreamCursor, [ + {_id: 3, operationType: "insert", postImage: {_id: 3, a: 1}}, + {_id: 1, operationType: "update", preImage: {_id: 1, a: 2}, postImage: {_id: 1, a: 3}}, + { + _id: 2, + operationType: "replace", + preImage: {_id: 2, a: "Long string"}, + postImage: {_id: 2, a: 1} + }, + {_id: 2, operationType: "update", preImage: {_id: 2, a: 1}, postImage: {_id: 2, a: 2}}, + {_id: 3, operationType: "delete", preImage: {_id: 3, a: 1}}, +]); + +jsTestLog("Testing a transaction consisting of multiple 'applyOps' entries with large pre-images."); +const largePreImageSizeInBytes = 7 * 1024 * 1024; +const largePreImageValue = "c".repeat(largePreImageSizeInBytes); +assert.commandWorked(coll.insert([{_id: 3, a: largePreImageValue}])); +TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) { + assert.commandWorked(coll.updateOne({_id: 3}, {$set: {b: 1}})); + assert.commandWorked(coll.deleteOne({_id: 3})); +}); +assertChangeEventsReturned(changeStreamCursor, [ + {_id: 3, operationType: "insert", postImage: {_id: 3, a: largePreImageValue}}, + { + _id: 3, + operationType: "update", + preImage: {_id: 3, a: largePreImageValue}, + postImage: {_id: 3, a: largePreImageValue, b: 1} + }, + { + _id: 3, + operationType: "delete", + preImage: {_id: 3, a: largePreImageValue, b: 1}, + }, +]); + +// "applyOps" command can only be issued on a replica set. +if (!FixtureHelpers.isMongos(testDB)) { + jsTestLog("Testing non-atomic 'applyOps' command."); + assert.commandWorked(coll.insert([{_id: 5, a: 1}, {_id: 6, a: 1}])); + assert.commandWorked(testDB.runCommand({ + applyOps: [ + {op: "u", ns: coll.getFullName(), o2: {_id: 5}, o: {$set: {a: 2}}}, + {op: "d", ns: coll.getFullName(), o: {_id: 6}} + ], + allowAtomic: false, + })); + assertChangeEventsReturned(changeStreamCursor, [ + {_id: 5, operationType: "insert", postImage: {_id: 5, a: 1}}, + {_id: 6, operationType: "insert", postImage: {_id: 6, a: 1}}, + {_id: 5, operationType: "update", preImage: {_id: 5, a: 1}, postImage: {_id: 5, a: 2}}, + {_id: 6, operationType: "delete", preImage: {_id: 6, a: 1}}, + ]); +} +})();
\ No newline at end of file diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js index a5549043284..7afba2009b0 100644 --- a/jstests/change_streams/lookup_pre_image.js +++ b/jstests/change_streams/lookup_pre_image.js @@ -6,8 +6,6 @@ * assumes_unsharded_collection, * do_not_wrap_aggregations_in_facets, * uses_multiple_connections, - * # TODO SERVER-58694: remove this tag. - * change_stream_does_not_expect_txns, * ] */ (function() { diff --git a/jstests/core/write_change_stream_pit_preimage_in_transaction.js b/jstests/core/write_change_stream_pit_preimage_in_transaction.js new file mode 100644 index 00000000000..3afd78ef463 --- /dev/null +++ b/jstests/core/write_change_stream_pit_preimage_in_transaction.js @@ -0,0 +1,165 @@ +/** + * Tests that pre-images are written to the pre-images collection on updates and deletes in + * transactions and for non-atomic "applyOps" command. + * @tags: [ + * requires_fcv_53, + * featureFlagChangeStreamPreAndPostImages, + * assumes_against_mongod_not_mongos, + * requires_capped, + * requires_replication, + * requires_getmore, + * uses_transactions, + * ] + */ +(function() { +"use strict"; + +load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers.prepareTransaction. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isReplSet(). +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load( + "jstests/libs/change_stream_util.js"); // For + // assertChangeStreamPreAndPostImagesCollectionOptionIsEnabled, + // assertChangeStreamPreAndPostImagesCollectionOptionIsAbsent, + // preImagesForOps. +load("jstests/libs/transactions_util.js"); // For TransactionsUtil.runInTransaction. + +// TODO SERVER-63272: remove this check. +if (!FixtureHelpers.isReplSet(db)) { + jsTestLog( + "Skipping the test as pre-images are not recorded in standalone mode and the test is designed to work with a replica set."); + return; +} + +const testDB = db.getSiblingDB(jsTestName()); +const localDB = db.getSiblingDB("local"); + +// Verifies that the expected pre-images are written during function 'ops' invocation. +function assertPreImagesWrittenForOps(db, ops, expectedPreImages) { + const writtenPreImages = preImagesForOps(db, ops); + assert.eq( + expectedPreImages.length, + writtenPreImages.length, + `Expected pre-image documents: ${tojson(expectedPreImages)}. Found pre-image documents: ${ + tojson(writtenPreImages)}.`); + + for (let idx = 0; idx < writtenPreImages.length; idx++) { + assert.eq(writtenPreImages[idx].preImage, expectedPreImages[idx]); + assertValidChangeStreamPreImageDocument(writtenPreImages[idx]); + } +} + +// Cross-checks the content of the pre-image document 'preImage' against the associated oplog entry. +function assertValidChangeStreamPreImageDocument(preImage) { + function assertChangeStreamPreImageDocumentMatchesOplogEntry(oplogEntry, preImage, wallTime) { + // Pre-images documents are recorded only for update and delete commands. + assert.contains(oplogEntry.op, ["u", "d"], oplogEntry); + assert.eq(preImage._id.nsUUID, oplogEntry.ui, oplogEntry); + assert.eq(preImage.operationTime, wallTime, oplogEntry); + if (oplogEntry.hasOwnProperty("o2")) { + assert.eq(preImage.preImage._id, oplogEntry.o2._id, oplogEntry); + } + } + const oplogEntryCursor = localDB.oplog.rs.find({ts: preImage._id.ts}); + assert(oplogEntryCursor.hasNext()); + const oplogEntry = oplogEntryCursor.next(); + if (oplogEntry.o.hasOwnProperty("applyOps")) { + const applyOpsOplogEntry = oplogEntry; + assert(preImage._id.applyOpsIndex < applyOpsOplogEntry.o.applyOps.length); + const applyOpsEntry = applyOpsOplogEntry.o.applyOps[preImage._id.applyOpsIndex.toNumber()]; + assertChangeStreamPreImageDocumentMatchesOplogEntry( + applyOpsEntry, preImage, applyOpsOplogEntry.wall); + } else { + assert.eq(preImage._id.applyOpsIndex, + 0, + "applyOpsIndex value greater than 0 not expected for non-applyOps oplog entries"); + assertChangeStreamPreImageDocumentMatchesOplogEntry(oplogEntry, preImage, oplogEntry.wall); + } +} + +const coll = assertDropAndRecreateCollection( + testDB, "coll", {changeStreamPreAndPostImages: {enabled: true}}); +const otherColl = assertDropAndRecreateCollection(testDB, "coll_regular"); + +// Gets collections used in the test for database 'db'. In some passthroughs the collections get +// sharded on 'getCollection()' invocation and it must happen when a transaction is not active. +function getCollections(db) { + return {coll: db[coll.getName()], otherColl: db[otherColl.getName()]}; +} + +// Tests the pre-image writing behavior in a transaction. +(function testPreImageWritingInTransaction() { + // Verify that the pre-images are written correctly for a transaction with update and delete + // operations consisting of a single "applyOps" entry. + assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 2, a: 1}, {_id: 3, a: 1}])); + assert.commandWorked(otherColl.insert([{_id: 1, a: 1}])); + assertPreImagesWrittenForOps(db, function() { + TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) { + assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}})); + assert.commandWorked(otherColl.updateOne({_id: 1}, {$inc: {a: 1}})); + assert.commandWorked(coll.updateOne({_id: 2}, {$inc: {a: 1}})); + assert.commandWorked(coll.deleteOne({_id: 3})); + }); + }, [{_id: 1, a: 1}, {_id: 2, a: 1}, {_id: 3, a: 1}]); + + // Verify that the pre-images are written correctly for a transaction with update and delete + // operations consisting of multiple "applyOps" entries. + const stringSizeInBytes = 15 * 1024 * 1024; + const largeString = "b".repeat(stringSizeInBytes); + assert.commandWorked(coll.insert([{_id: 3, a: 1}])); + assertPreImagesWrittenForOps(db, function() { + TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) { + assert.commandWorked(otherColl.insert({b: largeString})); + assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}})); + + assert.commandWorked(otherColl.insert({b: largeString})); + assert.commandWorked(coll.updateOne({_id: 2}, {$inc: {a: 1}})); + assert.commandWorked(coll.deleteOne({_id: 3})); + }); + }, [{_id: 1, a: 2}, {_id: 2, a: 2}, {_id: 3, a: 1}]); + + // Verify that large pre-images are written correctly for a transaction. + assert.commandWorked(coll.insert([{_id: 3, a: largeString}])); + assertPreImagesWrittenForOps(db, function() { + TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, _}) { + assert.commandWorked(coll.updateOne({_id: 1}, {$set: {b: largeString}})); + assert.commandWorked(coll.deleteOne({_id: 3})); + assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}})); + }); + }, [{_id: 1, a: 3}, {_id: 3, a: largeString}, {_id: 1, a: 3, b: largeString}]); +})(); + +(function testPreImageWritingForApplyOpsCommand() { + assert.commandWorked(coll.deleteMany({})); + assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 2, a: 1}])); + + // Verify that pre-images are written correctly for the non-atomic "applyOps" command. + assertPreImagesWrittenForOps(db, function() { + assert.commandWorked(testDB.runCommand({ + applyOps: [ + {op: "u", ns: coll.getFullName(), o2: {_id: 1}, o: {$set: {a: 2}}}, + {op: "d", ns: coll.getFullName(), o: {_id: 2}} + ], + allowAtomic: false, + })); + }, [{_id: 1, a: 1}, {_id: 2, a: 1}]); +})(); + +(function testPreImageWritingForPreparedTransaction() { + assert.commandWorked(coll.deleteMany({})); + assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 3, a: 1}])); + + // Verify that pre-images are written correctly for a transaction that is prepared and then + // committed. + assertPreImagesWrittenForOps(db, function() { + const session = db.getMongo().startSession(); + const sessionDb = session.getDatabase(jsTestName()); + session.startTransaction({readConcern: {level: "majority"}}); + const collInner = sessionDb[coll.getName()]; + assert.commandWorked(collInner.updateOne({_id: 1}, {$inc: {a: 1}})); + assert.commandWorked(collInner.deleteOne({_id: 3})); + let prepareTimestamp = PrepareHelpers.prepareTransaction(session); + assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp)); + }, [{_id: 1, a: 1}, {_id: 3, a: 1}]); +})(); +}()); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 73350adbad9..befdc82aa46 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -103,6 +103,9 @@ function canonicalizeEventForTesting(event, expected) { if (!expected.hasOwnProperty("lsid")) delete event.lsid; + if (!expected.hasOwnProperty("updateDescription")) + delete event.updateDescription; + // TODO SERVER-50301: The 'truncatedArrays' field may not appear in the updateDescription // depending on whether $v:2 update oplog entries are enabled. When the expected event has an // empty 'truncatedFields' we do not require that the actual event contain the field. This @@ -550,6 +553,9 @@ function assertInvalidChangeStreamNss(dbName, collName = "test", options) { res, [ErrorCodes.InvalidNamespace, ErrorCodes.InvalidOptions])); } +const kPreImagesCollectionDatabase = "config"; +const kPreImagesCollectionName = "system.preimages"; + /** * Asserts that 'changeStreamPreAndPostImages' collection option is present and is enabled for * collection. @@ -569,22 +575,50 @@ function assertChangeStreamPreAndPostImagesCollectionOptionIsAbsent(db, collName // Returns the pre-images written while performing the write operations. function preImagesForOps(db, writeOps) { - const preImagesColl = db.getSiblingDB('config').getCollection("system.preimages"); - const numberOfPreImagesBefore = preImagesColl.find().itcount(); + const preImagesColl = + db.getSiblingDB(kPreImagesCollectionDatabase).getCollection(kPreImagesCollectionName); + const preImagesCollSortSpec = {"_id.ts": 1, "_id.applyOpsIndex": 1}; + + // Determine the id of the last pre-image document written to be able to determine the pre-image + // documents written by 'writeOps()'. The pre-image purging job may concurrently remove some + // pre-image documents while this function is executing. + const preImageIdsBefore = + preImagesColl.find({}, {}).sort(preImagesCollSortSpec).allowDiskUse().toArray(); + const lastPreImageId = (preImageIdsBefore.length > 0) + ? preImageIdsBefore[preImageIdsBefore.length - 1]._id + : undefined; // Perform the write operations. writeOps(); // Return only newly written pre-images. - return preImagesColl.find() + const preImageFilter = lastPreImageId ? {"_id.ts": {$gt: lastPreImageId.ts}} : {}; + const result = + preImagesColl.find(preImageFilter).sort(preImagesCollSortSpec).allowDiskUse().toArray(); + + // Verify that the result is correct by checking if the last pre-image still exists. However, if + // no pre-image document existed before 'writeOps()' invocation, the result may be incorrect. + assert(lastPreImageId === undefined || preImagesColl.find({_id: lastPreImageId}).itcount() == 1, + "Last pre-image document has been removed by the pre-image purging job."); + return result; +} + +/** + * Returns documents from the pre-images collection from 'connection' ordered by _id.ts, + * _id.applyOpsIndex ascending. + */ +function getPreImages(connection) { + return connection.getDB(kPreImagesCollectionDatabase)[kPreImagesCollectionName] + .find() .sort({"_id.ts": 1, "_id.applyOpsIndex": 1}) - .skip(numberOfPreImagesBefore) + .allowDiskUse() .toArray(); } function findPreImagesCollectionDescriptions(db) { - return db.getSiblingDB("config").runCommand("listCollections", - {filter: {name: "system.preimages"}}); + return db.getSiblingDB(kPreImagesCollectionDatabase).runCommand("listCollections", { + filter: {name: kPreImagesCollectionName} + }); } /** diff --git a/jstests/libs/parallelTester.js b/jstests/libs/parallelTester.js index 86704446cd4..54738e6fccf 100644 --- a/jstests/libs/parallelTester.js +++ b/jstests/libs/parallelTester.js @@ -239,6 +239,10 @@ if (typeof _threadInject != "undefined") { "collection_uuid_coll_mod.js", "collection_uuid_rename_collection.js", "collection_uuid_index_commands.js", + + // These tests rely on no writes happening that would force oplog truncation. + "write_change_stream_pit_preimage_in_transaction.js", + "write_change_stream_pit_preimage.js", ]); // Get files, including files in subdirectories. diff --git a/jstests/libs/transactions_util.js b/jstests/libs/transactions_util.js index 90b34cdefd9..5b17c51db9e 100644 --- a/jstests/libs/transactions_util.js +++ b/jstests/libs/transactions_util.js @@ -107,11 +107,30 @@ var TransactionsUtil = (function() { res.errorLabels.includes('TransientTransactionError'); } + // Runs a function 'func()' in a transaction on database 'db'. Invokes function + // 'beforeTransactionFunc()' before the transaction (can be used to get references to + // collections etc.). + // + // Function 'beforeTransactionFunc(db, session)' accepts database in session 'db' and the + // session 'session'. + // Function 'func(db, state)' accepts database in session 'db' and an object returned by + // 'beforeTransactionFunc()' - 'state'. + // 'transactionOptions' - parameters for the transaction. + function runInTransaction(db, beforeTransactionFunc, func, transactionOptions = {}) { + const session = db.getMongo().startSession(); + const sessionDb = session.getDatabase(db.getName()); + const state = beforeTransactionFunc(sessionDb, session); + session.startTransaction(transactionOptions); + func(sessionDb, state); + session.commitTransaction_forTesting(); + } + return { commandIsNonTxnAggregation, commandSupportsTxn, commandTypeCanSupportTxn, deepCopyObject, isTransientTransactionError, + runInTransaction, }; })(); diff --git a/jstests/noPassthrough/change_stream_preimages_standalone_mode.js b/jstests/noPassthrough/change_stream_preimages_standalone_mode.js index 1d84276bbcf..46e11a6928f 100644 --- a/jstests/noPassthrough/change_stream_preimages_standalone_mode.js +++ b/jstests/noPassthrough/change_stream_preimages_standalone_mode.js @@ -10,8 +10,6 @@ * requires_replication, * requires_fcv_52, * featureFlagChangeStreamPreAndPostImages, - * # TODO SERVER-58694: remove this tag. - * change_stream_does_not_expect_txns, * ] */ diff --git a/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js b/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js new file mode 100644 index 00000000000..28bf2066418 --- /dev/null +++ b/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js @@ -0,0 +1,107 @@ +/** + * Tests change stream point-in-time pre-images deletion replication to secondaries when primary + * node state is not the same as of the secondary - the pre-image document to be deleted exists on + * the primary node but does not exist on the secondary. + * + * @tags: [ + * # Change streams are only supported on WiredTiger. + * requires_wiredtiger, + * requires_fcv_53, + * featureFlagChangeStreamPreAndPostImages, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_util.js"); // For getPreImages(). +load("jstests/libs/fail_point_util.js"); +load('jstests/replsets/rslib.js'); // For getLatestOp, getFirstOplogEntry. + +const oplogSizeMB = 1; +const replTest = new ReplSetTest({ + name: jsTestName(), + nodes: [{}, {rsConfig: {priority: 0}}], + nodeOptions: { + setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})}, + oplogSize: oplogSizeMB + } +}); +replTest.startSet(); +replTest.initiate(); +const primaryNode = replTest.getPrimary(); + +const collectionName = "coll"; +const testDB = primaryNode.getDB(jsTestName()); + +// Create a collection with change stream pre- and post-images enabled. +assert.commandWorked( + testDB.createCollection(collectionName, {changeStreamPreAndPostImages: {enabled: true}})); +const coll = testDB[collectionName]; + +// Insert a document for the test. +assert.commandWorked(coll.insert({_id: 1, v: 1}, {writeConcern: {w: 2}})); + +// Add a new node that will perform an initial sync. Pause the initial sync process (using +// failpoint "initialSyncHangBeforeCopyingDatabases") before copying the database to perform +// document modifications to make the collection content more recent and create inconsistent +// data situation during oplog application. +const initialSyncNode = replTest.add({ + rsConfig: {priority: 0}, + setParameter: {'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'})} +}); + +// Wait until the new node starts and pauses on the fail point. +replTest.reInitiate(); +assert.commandWorked(initialSyncNode.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCopyingDatabases", + timesEntered: 1, + maxTimeMS: 60000 +})); + +// Update the document on the primary node. +assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}})); + +// Resume the initial sync process. +assert.commandWorked(initialSyncNode.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"})); + +// Wait until the initial sync process is complete and the new node becomes a fully +// functioning secondary. +replTest.waitForState(initialSyncNode, ReplSetTest.State.SECONDARY); + +// Verify that pre-images were not written during the logical initial sync. At this point the +// pre-image collections in the nodes of the replica set are out of sync. +let preImageDocuments = getPreImages(initialSyncNode); +assert.eq(preImageDocuments.length, 0, preImageDocuments); + +// Force deletion of all pre-images and ensure that this replicates to all nodes. +// Roll over all current oplog entries. +const lastOplogEntryToBeRemoved = getLatestOp(primaryNode); +assert.neq(lastOplogEntryToBeRemoved, null); +const largeString = 'a'.repeat(256 * 1024); +const otherColl = primaryNode.getDB(jsTestName())["otherCollection"]; + +// Checks if the oplog has been rolled over from the timestamp of +// 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater +// than 'lastOplogEntryToBeRemoved'. +function oplogIsRolledOver() { + return timestampCmp(lastOplogEntryToBeRemoved.ts, + getFirstOplogEntry(primaryNode, {readConcern: "majority"}).ts) <= 0; +} + +while (!oplogIsRolledOver()) { + assert.commandWorked( + otherColl.insert({long_str: largeString}, {writeConcern: {w: "majority"}})); +} + +// Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' job deletes the expired pre-images +// (all). +assert.soon(() => { + const preImages = getPreImages(primaryNode); + return preImages.length == 0; +}); + +// Verify that all nodes get in sync and do not crash. +replTest.awaitReplication(); +replTest.stopSet(); +})(); diff --git a/jstests/replsets/change_stream_pit_pre_images.js b/jstests/replsets/change_stream_pit_pre_images.js index 726507d0fcf..43e0af1f8b2 100644 --- a/jstests/replsets/change_stream_pit_pre_images.js +++ b/jstests/replsets/change_stream_pit_pre_images.js @@ -13,30 +13,24 @@ */ (function() { "use strict"; +load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers.prepareTransaction. +load("jstests/libs/change_stream_util.js"); // For getPreImages(). load("jstests/libs/fail_point_util.js"); load("jstests/libs/retryable_writes_util.js"); +load("jstests/libs/transactions_util.js"); // For TransactionsUtil.runInTransaction. const testName = jsTestName(); -const preImagesCollectionDatabase = "config"; -const preImagesCollectionName = "system.preimages"; const replTest = new ReplSetTest({ name: testName, nodes: [{}, {rsConfig: {priority: 0}}], - nodeOptions: - {setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})}} + nodeOptions: { + setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})}, + oplogSize: 1024 + } }); replTest.startSet(); replTest.initiate(); -// Returns documents from the pre-images collection from 'node' ordered by _id.ts, _id.applyOpsIndex -// ascending. -function getPreImages(node) { - return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName] - .find() - .sort({"_id.ts": 1, "_id.applyOpsIndex": 1}) - .toArray(); -} - // Asserts that documents in the pre-images collection on the primary node are the same as on a // secondary node. function assertPreImagesCollectionOnPrimaryMatchesSecondary() { @@ -84,7 +78,62 @@ for (const [collectionName, collectionOptions] of [ {_id: 5, v: 3}); } + function issueWriteCommandsInTransaction(testDB) { + assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]})); + assert.commandWorked(coll.insert([{_id: 6, a: 1}, {_id: 7, a: 1}, {_id: 8, a: 1}])); + + const transactionOptions = {readConcern: {level: "majority"}, writeConcern: {w: 2}}; + + // Issue commands in a single "applyOps" transaction. + TransactionsUtil.runInTransaction(testDB, () => {}, function(db, state) { + const coll = db[collectionName]; + assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}})); + assert.commandWorked(coll.replaceOne({_id: 7}, {a: "Long string"})); + assert.commandWorked(coll.deleteOne({_id: 8})); + }, transactionOptions); + + // Issue commands in a multiple-"applyOps" transaction. + assert.commandWorked(coll.insert({_id: 8, a: 1})); + TransactionsUtil.runInTransaction(testDB, () => {}, function(db, state) { + const coll = db[collectionName]; + const largeString = "a".repeat(15 * 1024 * 1024); + assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}})); + assert.commandWorked(coll.insert({_id: 9, a: largeString})); + assert.commandWorked(coll.insert( + {_id: 10, a: largeString})); // Should go to the second "applyOps" entry. + assert.commandWorked(coll.replaceOne({_id: 7}, {a: "String"})); + assert.commandWorked(coll.deleteOne({_id: 8})); + }, transactionOptions); + + // Issue commands in a transaction that gets prepared before a commit. + assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]})); + assert.commandWorked(coll.insert([{_id: 6, a: 1}, {_id: 7, a: 1}, {_id: 8, a: 1}])); + const session = testDB.getMongo().startSession(); + const sessionDb = session.getDatabase(testDB.getName()); + session.startTransaction(); + const collInner = sessionDb[coll.getName()]; + assert.commandWorked(collInner.updateOne({_id: 6}, {$inc: {a: 1}})); + assert.commandWorked(collInner.replaceOne({_id: 7}, {a: "Long string"})); + assert.commandWorked(collInner.deleteOne({_id: 8})); + let prepareTimestamp = PrepareHelpers.prepareTransaction(session); + assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp)); + } + + function issueNonAtomicApplyOpsCommand(testDB) { + assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 9}}, {_id: {$lte: 10}}]})); + assert.commandWorked(coll.insert([{_id: 9, a: 1}, {_id: 10, a: 1}])); + assert.commandWorked(testDB.runCommand({ + applyOps: [ + {op: "u", ns: coll.getFullName(), o2: {_id: 9}, o: {$set: {a: 2}}}, + {op: "d", ns: coll.getFullName(), o: {_id: 10}} + ], + allowAtomic: false, + })); + } + (function testSteadyStateReplication() { + jsTestLog("Testing pre-image replication to secondaries."); + // Insert a document. assert.commandWorked(coll.insert({_id: 1, v: 1, largeField: "AAAAAAAAAAAAAAAAAAAAAAAA"})); @@ -106,12 +155,26 @@ for (const [collectionName, collectionOptions] of [ // Issue retryable "findAndModify" commands. issueRetryableFindAndModifyCommands(testDB); - // Verify that a related change stream pre-images were replicated to the secondary. + // Verify that related change stream pre-images were replicated to the secondary. + replTest.awaitReplication(); + assertPreImagesCollectionOnPrimaryMatchesSecondary(); + + issueWriteCommandsInTransaction(testDB); + + // Verify that related change stream pre-images were replicated to the secondary. + replTest.awaitReplication(); + assertPreImagesCollectionOnPrimaryMatchesSecondary(); + + issueNonAtomicApplyOpsCommand(testDB); + + // Verify that related change stream pre-images were replicated to the secondary. replTest.awaitReplication(); assertPreImagesCollectionOnPrimaryMatchesSecondary(); })(); (function testInitialSync() { + jsTestLog("Testing pre-image replication during the logical initial sync."); + // Insert a document for deletion test. assert.commandWorked(coll.insert({_id: 3, field: "A"}, {writeConcern: {w: 2}})); @@ -139,8 +202,9 @@ for (const [collectionName, collectionOptions] of [ // Delete the document on the primary node. assert.commandWorked(coll.deleteOne({_id: 3}, {writeConcern: {w: 2}})); - // Issue retryable "findAndModify" commands. issueRetryableFindAndModifyCommands(testDB); + issueNonAtomicApplyOpsCommand(testDB); + issueWriteCommandsInTransaction(testDB); // Resume the initial sync process. assert.commandWorked(initialSyncNode.adminCommand( @@ -167,6 +231,8 @@ for (const [collectionName, collectionOptions] of [ })(); (function testStartupRecovery() { + jsTestLog("Testing pre-image writing during startup recovery."); + // Pause check-pointing on the primary node to ensure new pre-images are not flushed to the // disk. const pauseCheckpointThreadFailPoint = @@ -180,8 +246,9 @@ for (const [collectionName, collectionOptions] of [ assert.commandWorked(coll.insert({_id: 4, field: "A"})); assert.commandWorked(coll.deleteOne({_id: 4}, {writeConcern: {w: 2}})); - // Issue retryable "findAndModify" commands. issueRetryableFindAndModifyCommands(testDB); + issueNonAtomicApplyOpsCommand(testDB); + issueWriteCommandsInTransaction(testDB); // Do an unclean shutdown of the primary node, and then restart. replTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}); @@ -194,6 +261,5 @@ for (const [collectionName, collectionOptions] of [ assertPreImagesCollectionOnPrimaryMatchesSecondary(); })(); } - replTest.stopSet(); })(); diff --git a/jstests/change_streams/change_stream_lookup_preimage_with_resharding.js b/jstests/sharding/change_streams/change_stream_lookup_preimage_with_resharding.js index ebc33433247..b00f94aed0b 100644 --- a/jstests/change_streams/change_stream_lookup_preimage_with_resharding.js +++ b/jstests/sharding/change_streams/change_stream_lookup_preimage_with_resharding.js @@ -5,13 +5,8 @@ * * @tags: [ * featureFlagChangeStreamPreAndPostImages, - * featureFlagClusteredIndexes, - * requires_fcv_52, + * requires_fcv_53, * uses_change_streams, - * # TODO SERVER-58694: remove this tag. - * change_stream_does_not_expect_txns, - * # TODO SERVER-60238: remove this tag. - * does_not_support_causal_consistency, * assumes_unsharded_collection, * assumes_read_preference_unchanged, * ] @@ -30,10 +25,11 @@ reshardingTest.setup(); const donorShardNames = reshardingTest.donorShardNames; const recipientShardNames = reshardingTest.recipientShardNames; +const collectionName = "test.whileResharding"; // Create a sharded collection with 'oldShardKey' as the shard key. const coll = reshardingTest.createShardedCollection({ - ns: "test.whileResharding", + ns: collectionName, shardKeyPattern: {oldShardKey: 1}, chunks: [ {min: {oldShardKey: MinKey}, max: {oldShardKey: MaxKey}, shard: donorShardNames[0]}, @@ -82,7 +78,9 @@ assert.commandWorked(coll.getDB().runCommand( // Insert some documents before resharding the collection so that there is data to clone. assert.commandWorked(coll.insert([ {_id: 0, annotation: "pre-resharding-insert", oldShardKey: 0, newShardKey: 2}, - {_id: 1, annotation: "pre-resharding-insert", oldShardKey: 1, newShardKey: 3} + {_id: 1, annotation: "pre-resharding-insert", oldShardKey: 1, newShardKey: 3}, + {_id: 2, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3}, + {_id: 3, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3}, ])); // Verify that 'insert' operations does not record any pre-images. @@ -121,6 +119,23 @@ reshardingTest.withReshardingInBackground( assert.commandWorked( coll.update({_id: 1}, {$set: {annotation: "during-resharding-update"}})); assert.commandWorked(coll.remove({_id: 1}, {justOne: true})); + + // Perform some operations in a transaction. + assert.retryNoExcept( + () => { + const session = coll.getDB().getMongo().startSession(); + const sessionDB = session.getDatabase(coll.getDB().getName()); + const sessionColl = sessionDB.getCollection(coll.getName()); + session.startTransaction(); + assert.commandWorked(sessionColl.update( + {_id: 2}, {$set: {annotation: "during-resharding-txn-update"}})); + assert.commandWorked(sessionColl.remove({_id: 3}, {justOne: true})); + session.commitTransaction_forTesting(); + return true; + }, + "Failed to execute a transaction while resharding was in progress", + 10 /*num_attempts*/, + 100 /*intervalMS*/); }); // Verify that after the resharding is complete, the pre-image collection exists on the recipient @@ -142,7 +157,9 @@ verifyPreImages(donorConn, [ {_id: 1, annotation: "pre-resharding-insert", oldShardKey: 1, newShardKey: 3}, {_id: 0, annotation: "pre-resharding-update", oldShardKey: 0, newShardKey: 2}, {_id: 1, annotation: "pre-resharding-update", oldShardKey: 1, newShardKey: 3}, - {_id: 1, annotation: "during-resharding-update", oldShardKey: 1, newShardKey: 3} + {_id: 1, annotation: "during-resharding-update", oldShardKey: 1, newShardKey: 3}, + {_id: 2, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3}, + {_id: 3, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3}, ]); verifyPreImages(recipientConn, [{_id: 0, annotation: "during-resharding-update", oldShardKey: 0, newShardKey: 2}]); @@ -176,6 +193,13 @@ verifyChangeStreamEvents(csCursor, [ {opType: "delete", id: 1, prevAnnotation: "during-resharding-update"}, { opType: "update", + id: 2, + prevAnnotation: "pre-resharding-txn", + curAnnotation: "during-resharding-txn-update" + }, + {opType: "delete", id: 3, prevAnnotation: "pre-resharding-txn"}, + { + opType: "update", id: 0, prevAnnotation: "during-resharding-update", curAnnotation: "post-resharding-update" diff --git a/jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js b/jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js new file mode 100644 index 00000000000..de048fb4ed1 --- /dev/null +++ b/jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js @@ -0,0 +1,23 @@ +/** + * Tests that retryable internal transactions for "findAndModify" commands against collection with + * changeStreamPreAndPostImages enabled are retryable. + * + * @tags: [ + * requires_fcv_53, + * featureFlagInternalTransactions, + * featureFlagChangeStreamPreAndPostImages + * ] + */ +(function() { +'use strict'; + +load('jstests/sharding/libs/retryable_internal_transaction_test.js'); + +const transactionTest = + new RetryableInternalTransactionTest({changeStreamPreAndPostImages: {enabled: true}}); +transactionTest.runTestsForAllRetryableInternalTransactionTypes( + transactionTest.runFindAndModifyTestsEnableImageCollection); +transactionTest.runTestsForAllRetryableInternalTransactionTypes( + transactionTest.runFindAndModifyTestsDisableImageCollection); +transactionTest.stop(); +})(); diff --git a/jstests/sharding/libs/retryable_internal_transaction_test.js b/jstests/sharding/libs/retryable_internal_transaction_test.js index 2d3ac9d4863..327ea4ab417 100644 --- a/jstests/sharding/libs/retryable_internal_transaction_test.js +++ b/jstests/sharding/libs/retryable_internal_transaction_test.js @@ -31,7 +31,7 @@ function getOplogEntriesForTxnWithRetries(rs, lsid, txnNumber) { return oplogEntries; } -function RetryableInternalTransactionTest() { +function RetryableInternalTransactionTest(collectionOptions = {}) { // Set a large oplogSize since this test runs a find command to get the oplog entries for // every transaction that it runs including large transactions and with the default oplogSize, // oplog reading done by the find command may not be able to keep up with the oplog truncation, @@ -51,10 +51,9 @@ function RetryableInternalTransactionTest() { const kDbName = "testDb"; const kCollName = "testColl"; const mongosTestDB = st.s.getDB(kDbName); + assert.commandWorked(mongosTestDB.createCollection(kCollName, collectionOptions)); const mongosTestColl = mongosTestDB.getCollection(kCollName); - assert.commandWorked(mongosTestDB.createCollection(kCollName)); - function makeSessionIdForRetryableInternalTransaction() { return {id: UUID(), txnNumber: NumberLong(0), txnUUID: UUID()}; } diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 895845a0fde..b8a7b4f28e3 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -80,6 +80,7 @@ namespace mongo { using repl::DurableOplogEntry; using repl::MutableOplogEntry; +using ChangeStreamPreImageRecordingMode = repl::ReplOperation::ChangeStreamPreImageRecordingMode; const OperationContext::Decoration<boost::optional<repl::DocumentKey>> documentKeyDecoration = OperationContext::declareDecoration<boost::optional<repl::DocumentKey>>(); @@ -639,6 +640,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { invariant(args.updateArgs->preImageDoc); operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + operation.setPreImageRecordedForRetryableInternalTransaction(); if (args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kSideCollection) { operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); @@ -655,7 +657,26 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } } else if (args.updateArgs->preImageRecordingEnabledForCollection) { invariant(args.updateArgs->preImageDoc); + tassert( + 5869402, + "Change stream pre-image recording to the oplog and to the pre-image collection " + "requested at the same time", + !args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection); operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kOplog); + } + + if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection) { + invariant(args.updateArgs->preImageDoc); + tassert( + 5869403, + "Change stream pre-image recording to the oplog and to the pre-image collection " + "requested at the same time", + !args.updateArgs->preImageRecordingEnabledForCollection); + operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kPreImagesCollection); } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); @@ -813,17 +834,33 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, "Deleted document must be present for pre-image recording", args.deletedDoc); operation.setPreImage(args.deletedDoc->getOwned()); + operation.setPreImageRecordedForRetryableInternalTransaction(); if (args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kSideCollection) { operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); } } } - if (args.preImageRecordingEnabledForCollection) { + + if (args.changeStreamPreAndPostImagesEnabledForCollection) { + tassert(5869400, + "Deleted document must be present for pre-image recording", + args.deletedDoc); + tassert( + 5869401, + "Change stream pre-image recording to the oplog and to the pre-image collection " + "requested at the same time", + !args.preImageRecordingEnabledForCollection); + operation.setPreImage(args.deletedDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kPreImagesCollection); + } else if (args.preImageRecordingEnabledForCollection) { tassert(5868701, "Deleted document must be present for pre-image recording", args.deletedDoc); operation.setPreImage(args.deletedDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kOplog); } operation.setDestinedRecipient(destinedRecipientDecoration(opCtx)); @@ -1251,6 +1288,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, } namespace { + +/** + * Writes pre-images for update/replace/delete operations packed into a single "applyOps" entry to + * the change stream pre-images collection if required. The operations are defined by sequence + * ['stmtBegin', 'stmtEnd'). 'applyOpsTimestamp' and 'operationTime' are the timestamp and the wall + * clock time, respectively, of the "applyOps" entry. A pre-image is recorded for an operation only + * if pre-images are enabled for the collection the operation is issued on. + */ +void writeChangeStreamPreImagesForApplyOpsEntries( + OperationContext* opCtx, + const std::vector<repl::ReplOperation>::iterator& stmtBegin, + const std::vector<repl::ReplOperation>::iterator& stmtEnd, + Timestamp applyOpsTimestamp, + Date_t operationTime) { + int64_t applyOpsIndex{0}; + for (auto stmtIterator = stmtBegin; stmtIterator != stmtEnd; ++stmtIterator) { + auto& operation = *stmtIterator; + if (operation.isChangeStreamPreImageRecordedInPreImagesCollection() && + !operation.getNss().isTemporaryReshardingCollection()) { + invariant(operation.getUuid()); + invariant(!operation.getPreImage().isEmpty()); + writeToChangeStreamPreImagesCollection( + opCtx, + ChangeStreamPreImage{ + ChangeStreamPreImageId{*operation.getUuid(), applyOpsTimestamp, applyOpsIndex}, + operationTime, + operation.getPreImage()}); + } + ++applyOpsIndex; + } +} + // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array // field. Appends as many operations as possible to the array (and their corresponding statement // ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the @@ -1448,8 +1517,11 @@ int logOplogEntriesForTransaction( if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { for (auto& statement : *stmts) { - if (!statement.getPreImage().isEmpty() && - statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage) { + if (statement.isChangeStreamPreImageRecordedInOplog() || + (statement.isPreImageRecordedForRetryableInternalTransaction() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) { + invariant(!statement.getPreImage().isEmpty()); + // Note that 'needsRetryImage' stores the image kind that needs to stored in the // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the // pre-image will be written to the image collection (after all the applyOps oplog @@ -1573,6 +1645,10 @@ int logOplogEntriesForTransaction( prevWriteOpTime.writeOpTime.getTimestamp()}; } + const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp(); + writeChangeStreamPreImagesForApplyOpsEntries( + opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime()); + // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; numEntriesWritten++; @@ -1626,7 +1702,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, }); } -} // namespace +} // namespace void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp index 68e2e6949f8..3bdbf3954c5 100644 --- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp @@ -44,6 +44,10 @@ namespace mongo { void writeToChangeStreamPreImagesCollection(OperationContext* opCtx, const ChangeStreamPreImage& preImage) { const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace; + tassert(5869404, + str::stream() << "Invalid pre-image document applyOpsIndex: " + << preImage.getId().getApplyOpsIndex(), + preImage.getId().getApplyOpsIndex() >= 0); // This lock acquisition can block on a stronger lock held by another operation modifying the // pre-images collection. There are no known cases where an operation holding an exclusive lock @@ -52,7 +56,9 @@ void writeToChangeStreamPreImagesCollection(OperationContext* opCtx, AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX); UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON()); tassert(5868601, - "Failed to insert a new document into pre-images collection", + str::stream() << "Failed to insert a new document into the pre-images collection: ts: " + << preImage.getId().getTs().toString() + << ", applyOpsIndex: " << preImage.getId().getApplyOpsIndex(), !res.existing && !res.upsertedId.isEmpty()); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 773b13c1050..efe83289131 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -174,6 +174,7 @@ public: static constexpr StringData kLsidField = "lsid"_sd; static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd; static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd; + static constexpr StringData kApplyOpsTsField = "applyOpsTs"_sd; static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd; // The target namespace of a rename operation. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 148e0c9a385..b59348fc4e5 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -350,6 +350,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // unwinding a transaction. auto txnOpIndex = input[DocumentSourceChangeStream::kTxnOpIndexField]; auto applyOpsIndex = input[DocumentSourceChangeStream::kApplyOpsIndexField]; + auto applyOpsEntryTs = input[DocumentSourceChangeStream::kApplyOpsTsField]; // Add some additional fields only relevant to transactions. if (!txnOpIndex.missing()) { @@ -405,10 +406,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document } else { // Set 'kPreImageIdField' to the 'ChangeStreamPreImageId'. The DSCSAddPreImage stage // will use the id in order to fetch the pre-image from the pre-images collection. - const auto preImageId = - ChangeStreamPreImageId(uuid.getUuid(), - ts.getTimestamp(), - applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong()); + const auto preImageId = ChangeStreamPreImageId( + uuid.getUuid(), + applyOpsEntryTs.missing() ? ts.getTimestamp() : applyOpsEntryTs.getTimestamp(), + applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong()); doc.addField(DocumentSourceChangeStream::kPreImageIdField, Value(preImageId.toBSON())); } } @@ -447,9 +448,10 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString()); deps->fields.insert(DocumentSourceChangeStream::kTxnOpIndexField.toString()); - if (_preImageRequested) { + if (_preImageRequested || _postImageRequested) { deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString()); deps->fields.insert(DocumentSourceChangeStream::kApplyOpsIndexField.toString()); + deps->fields.insert(DocumentSourceChangeStream::kApplyOpsTsField.toString()); } return DepsTracker::State::EXHAUSTIVE_ALL; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index 8e51e29e7b9..cdd07aa79b9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -268,6 +268,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::TransactionO // Initialize iterators at the beginning of the transaction. _currentApplyOpsIt = _currentApplyOps.getArray().begin(); + _currentApplyOpsTs = firstTimestamp.getTimestamp(); _currentApplyOpsIndex = 0; _txnOpIndex = 0; } @@ -304,6 +305,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::getNextTrans BSONType::Array == bsonOp["applyOps"].type()); _currentApplyOps = Value(bsonOp["applyOps"]); + _currentApplyOpsTs = applyOpsEntry.getTimestamp(); _currentApplyOpsIt = _currentApplyOps.getArray().begin(); _currentApplyOpsIndex = 0; } @@ -338,6 +340,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::_addRequired // the current entry. newDoc.addField(DocumentSourceChangeStream::kApplyOpsIndexField, Value(static_cast<long long>(applyOpsIndex()))); + newDoc.addField(DocumentSourceChangeStream::kApplyOpsTsField, Value(applyOpsTs())); newDoc.addField(repl::OplogEntry::kTimestampFieldName, Value(_clusterTime)); newDoc.addField(repl::OplogEntry::kSessionIdFieldName, Value(_lsid)); diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h index a2659178d6b..f2b17259980 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h @@ -133,6 +133,15 @@ private: return _currentApplyOpsIndex - 1; } + /** + * Returns the timestamp of the "applyOps" entry containing the last operation returned by + * 'getNextTransactionOp()'. If 'getNextTransactionOp()' has not been called, returns the + * timestamp of the first "applyOps" entry in the transaction. + */ + Timestamp applyOpsTs() const { + return _currentApplyOpsTs; + } + Timestamp clusterTime() const { return _clusterTime; } @@ -194,6 +203,9 @@ private: // The index of the next entry within the current 'applyOps' array. size_t _currentApplyOpsIndex; + // The timestamp of the current 'applyOps' entry. + Timestamp _currentApplyOpsTs; + // Our current place within the entire transaction, which may consist of multiple 'applyOps' // arrays. size_t _txnOpIndex; diff --git a/src/mongo/db/repl/apply_ops_command_info.cpp b/src/mongo/db/repl/apply_ops_command_info.cpp index ced7b83b770..16057086653 100644 --- a/src/mongo/db/repl/apply_ops_command_info.cpp +++ b/src/mongo/db/repl/apply_ops_command_info.cpp @@ -130,6 +130,7 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, auto operationDocs = info.getOperations(); bool alwaysUpsert = info.getAlwaysUpsert() && !applyOpsOplogEntry.getTxnNumber(); + uint64_t applyOpsIdx{0}; for (const auto& operationDoc : operationDocs) { // Make sure that the inner ops are not malformed or over-specified. ReplOperation::parse(IDLParserErrorContext("extractOperations"), operationDoc); @@ -145,11 +146,19 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, builder.appendElementsUnique(topLevelDoc); auto operation = builder.obj(); - OplogEntry oplogEntry{operation}; - if (oplogEntry.getNeedsRetryImage()) { - oplogEntry.setTimestampForRetryImage(applyOpsOplogEntry.getTimestamp()); + operations->emplace_back(operation); + + // Preserve index of operation in the "applyOps" oplog entry, timestamp, and wall clock time + // of the "applyOps" entry. + auto& lastOperation = operations->back(); + lastOperation.setApplyOpsIndex(applyOpsIdx); + lastOperation.setApplyOpsTimestamp(applyOpsOplogEntry.getTimestamp()); + lastOperation.setApplyOpsWallClockTime(applyOpsOplogEntry.getWallClockTime()); + ++applyOpsIdx; + + if (lastOperation.getNeedsRetryImage()) { + lastOperation.setTimestampForRetryImage(applyOpsOplogEntry.getTimestamp()); } - operations->emplace_back(std::move(oplogEntry)); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0030db48c13..f307741e720 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1084,10 +1084,11 @@ void writeChangeStreamPreImage(OperationContext* opCtx, const CollectionPtr& collection, const mongo::repl::OplogEntry& oplogEntry, const BSONObj& preImage) { - ChangeStreamPreImageId preImageId{ - collection->uuid(), oplogEntry.getTimestamp(), 0 /*applyOpsIndex*/}; + ChangeStreamPreImageId preImageId{collection->uuid(), + oplogEntry.getTimestampForPreImage(), + static_cast<int64_t>(oplogEntry.getApplyOpsIndex())}; ChangeStreamPreImage preImageDocument{ - std::move(preImageId), oplogEntry.getWallClockTime(), preImage}; + std::move(preImageId), oplogEntry.getWallClockTimeForPreImage(), preImage}; writeToChangeStreamPreImagesCollection(opCtx, preImageDocument); } } // namespace @@ -1739,7 +1740,11 @@ Status applyOperation_inlock(OperationContext* opCtx, writeChangeStreamPreImage(opCtx, collection, op, *(result.requestedPreImage)); } - if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary) { + // It is legal for a delete operation on the pre-images collection to delete zero + // documents - pre-image collections are not guaranteed to contain the same set of + // documents at all times. + if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary && + !requestNss.isChangeStreamPreImagesCollection()) { LOGV2_WARNING(2170002, "Applied a delete which did not delete anything in steady state " "replication", diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index ece6da5d3ee..093c4b6620a 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -736,6 +736,37 @@ bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const { return _entry.isSingleOplogEntryTransactionWithCommand(); } +uint64_t OplogEntry::getApplyOpsIndex() const { + return _applyOpsIndex; +} + +void OplogEntry::setApplyOpsIndex(uint64_t value) { + _applyOpsIndex = value; +} + +const boost::optional<mongo::Timestamp>& OplogEntry::getApplyOpsTimestamp() const { + return _applyOpsTimestamp; +} + +void OplogEntry::setApplyOpsTimestamp(boost::optional<mongo::Timestamp> value) { + _applyOpsTimestamp = value; +} + +const boost::optional<mongo::Date_t>& OplogEntry::getApplyOpsWallClockTime() const { + return _applyOpsWallClockTime; +} +void OplogEntry::setApplyOpsWallClockTime(boost::optional<mongo::Date_t> value) { + _applyOpsWallClockTime = value; +} + +mongo::Timestamp OplogEntry::getTimestampForPreImage() const { + return getApplyOpsTimestamp().get_value_or(getTimestamp()); +} + +mongo::Date_t OplogEntry::getWallClockTimeForPreImage() const { + return getApplyOpsWallClockTime().get_value_or(getWallClockTime()); +} + bool OplogEntry::isCrudOpType() const { return _entry.isCrudOpType(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 8cfdf210ca9..29e8a0a71d9 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -68,6 +68,20 @@ constexpr auto kInitiatingSetMsg = "initiating set"_sd; class ReplOperation : public DurableReplOperation { public: + /** + * The way the change stream pre-images are recorded upon update/replace/delete operation. + */ + enum class ChangeStreamPreImageRecordingMode { + // The pre-image is not recorded. + kOff, + + // The pre-image is recorded in the change stream pre-images collection. + kPreImagesCollection, + + // The pre-image is recorded in the oplog as a separate entry. + kOplog, + }; + static ReplOperation parse(const IDLParserErrorContext& ctxt, const BSONObj& bsonObject) { ReplOperation o; o.parseProtected(ctxt, bsonObject); @@ -109,6 +123,54 @@ public: } /** + * Returns the change stream pre-images recording mode applied for this operation. + */ + ChangeStreamPreImageRecordingMode getChangeStreamPreImageRecordingMode() const { + return _preImageRecordingMode; + } + + /** + * Sets the change stream pre-images recording mode to apply for this operation. + */ + void setChangeStreamPreImageRecordingMode(ChangeStreamPreImageRecordingMode value) { + _preImageRecordingMode = value; + } + + /** + * Returns true if the change stream pre-image is recorded in a dedicated oplog entry for this + * operation. + */ + bool isChangeStreamPreImageRecordedInOplog() const { + return ReplOperation::ChangeStreamPreImageRecordingMode::kOplog == + getChangeStreamPreImageRecordingMode(); + } + + /** + * Returns true if the change stream pre-image is recorded in the change stream pre-images + * collection for this operation. + */ + bool isChangeStreamPreImageRecordedInPreImagesCollection() const { + return ReplOperation::ChangeStreamPreImageRecordingMode::kPreImagesCollection == + getChangeStreamPreImageRecordingMode(); + } + + /** + * Returns true if the operation is in a retryable internal transaction and pre-image must be + * recorded for the operation. + */ + bool isPreImageRecordedForRetryableInternalTransaction() const { + return _preImageRecordedForRetryableInternalTransaction; + } + + /** + * Sets whether the operation is in a retryable internal transaction and pre-image must be + * recorded for the operation. + */ + void setPreImageRecordedForRetryableInternalTransaction(bool value = true) { + _preImageRecordedForRetryableInternalTransaction = value; + } + + /** * Sets the statement ids for this ReplOperation to 'stmtIds' if it does not contain any * kUninitializedStmtId (i.e. placeholder statement id). */ @@ -134,6 +196,14 @@ private: // the images should be persisted. BSONObj _fullPreImage; BSONObj _fullPostImage; + + // Change stream pre-image recording mode applied to this operation. + ChangeStreamPreImageRecordingMode _preImageRecordingMode{ + ChangeStreamPreImageRecordingMode::kOff}; + + // Whether a pre-image must be recorded for this operation since it is in a retryable internal + // transaction. + bool _preImageRecordedForRetryableInternalTransaction{false}; }; /** @@ -637,6 +707,46 @@ public: bool isTerminalApplyOps() const; bool isSingleOplogEntryTransaction() const; bool isSingleOplogEntryTransactionWithCommand() const; + + /** + * Returns an index of this operation in the "applyOps" entry, if the operation is packed in the + * "applyOps" entry. Otherwise returns 0. + */ + uint64_t getApplyOpsIndex() const; + + void setApplyOpsIndex(uint64_t value); + + /** + * Returns a timestamp of the "applyOps" entry, if this operation is packed in the "applyOps" + * entry. Otherwise returns boost::none. + */ + const boost::optional<mongo::Timestamp>& getApplyOpsTimestamp() const; + + void setApplyOpsTimestamp(boost::optional<mongo::Timestamp> value); + + /** + * Returns wall clock time of the "applyOps" entry, if this operation is packed in the + * "applyOps" entry. Otherwise returns boost::none. + */ + const boost::optional<mongo::Date_t>& getApplyOpsWallClockTime() const; + + void setApplyOpsWallClockTime(boost::optional<mongo::Date_t> value); + + /** + * Returns a timestamp to use for recording of a change stream pre-image in the change stream + * pre-images collection. Returns a timestamp of the "applyOps" entry, if this operation is + * packed in the "applyOps" entry. Otherwise returns a timestamp of this oplog entry. + */ + mongo::Timestamp getTimestampForPreImage() const; + + /** + * Returns a wall clock time to use for recording of a change stream pre-image in the change + * stream pre-images collection. Returns a wall clock time of the "applyOps" entry, if this + * operation is packed in the "applyOps" entry. Otherwise returns a wall clock time of this + * oplog entry. + */ + mongo::Date_t getWallClockTimeForPreImage() const; + bool isCrudOpType() const; bool isUpdateOrDelete() const; bool isIndexCommandType() const; @@ -656,6 +766,18 @@ private: std::shared_ptr<DurableOplogEntry> _preImageOp; std::shared_ptr<DurableOplogEntry> _postImageOp; + // An index of this oplog entry in the associated "applyOps" oplog entry when this entry is + // extracted from an "applyOps" oplog entry. Otherwise, the index value must be 0. + uint64_t _applyOpsIndex{0}; + + // A timestamp of the associated "applyOps" oplog entry when this oplog entry is extracted from + // an "applyOps" oplog entry. + boost::optional<Timestamp> _applyOpsTimestamp{boost::none}; + + // Wall clock time of the associated "applyOps" oplog entry when this oplog entry is extracted + // from an "applyOps" oplog entry. + boost::optional<Date_t> _applyOpsWallClockTime{boost::none}; + bool _isForCappedCollection = false; // During oplog application on secondaries, oplog entries extracted from each applyOps oplog diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index dac6f597f7b..562ffdbd552 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1677,7 +1677,10 @@ void TransactionParticipant::Participant::addTransactionOperation( repl::DurableOplogEntry::getDurableReplOperationSize(operation); if (!operation.getPreImage().isEmpty()) { p().transactionOperationBytes += operation.getPreImage().objsize(); - ++p().numberOfPrePostImagesToWrite; + if (operation.isChangeStreamPreImageRecordedInOplog() || + operation.isPreImageRecordedForRetryableInternalTransaction()) { + ++p().numberOfPrePostImagesToWrite; + } } if (!operation.getPostImage().isEmpty()) { p().transactionOperationBytes += operation.getPostImage().objsize(); |