summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMindaugas Malinauskas <mindaugas.malinauskas@mongodb.com>2022-02-08 17:01:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-09 15:36:16 +0000
commit88d59bfe9d5ee2c9938ae251f7a77a8bf1250a6b (patch)
tree11f60598f603a8d18811cddf690145e266de8433
parent2ad330e831461b8451979716faf27a34af9bb8d2 (diff)
downloadmongo-88d59bfe9d5ee2c9938ae251f7a77a8bf1250a6b.tar.gz
SERVER-58694 Implement writing of pre-images for transactional update/replace/delete operations
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml1
-rw-r--r--jstests/auth/change_stream_pre_image_coll_role_auth.js2
-rw-r--r--jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js2
-rw-r--r--jstests/change_streams/lookup_pit_pre_and_post_image.js2
-rw-r--r--jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js151
-rw-r--r--jstests/change_streams/lookup_pre_image.js2
-rw-r--r--jstests/core/write_change_stream_pit_preimage.js1
-rw-r--r--jstests/core/write_change_stream_pit_preimage_in_transaction.js166
-rw-r--r--jstests/libs/change_stream_util.js46
-rw-r--r--jstests/libs/parallelTester.js4
-rw-r--r--jstests/libs/transactions_util.js19
-rw-r--r--jstests/noPassthrough/change_stream_preimages_standalone_mode.js2
-rw-r--r--jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js107
-rw-r--r--jstests/replsets/change_stream_pit_pre_images.js100
-rw-r--r--jstests/sharding/change_streams/change_stream_lookup_preimage_with_resharding.js (renamed from jstests/change_streams/change_stream_lookup_preimage_with_resharding.js)42
-rw-r--r--jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js23
-rw-r--r--jstests/sharding/libs/retryable_internal_transaction_test.js5
-rw-r--r--src/mongo/db/op_observer_impl.cpp84
-rw-r--r--src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h12
-rw-r--r--src/mongo/db/repl/apply_ops_command_info.cpp17
-rw-r--r--src/mongo/db/repl/oplog.cpp13
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp31
-rw-r--r--src/mongo/db/repl/oplog_entry.h122
-rw-r--r--src/mongo/db/transaction_participant.cpp5
34 files changed, 925 insertions, 67 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams.yml b/buildscripts/resmokeconfig/suites/change_streams.yml
index 1493534fbb5..1b8715b5732 100644
--- a/buildscripts/resmokeconfig/suites/change_streams.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams.yml
@@ -52,4 +52,4 @@ executor:
bind_ip_all: ''
set_parameters:
enableTestCommands: 1
- num_nodes: 1
+ num_nodes: 2
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index 30293e405cd..409a6ef0dbf 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -14,8 +14,6 @@ selector:
- assumes_write_concern_unchanged
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
- exclude_files:
- - jstests/change_streams/change_stream_lookup_preimage_with_resharding.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml b/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml
index 516c2107e37..8ce0bbda59c 100644
--- a/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/read_concern_linearizable_passthrough.yml
@@ -6,6 +6,8 @@ selector:
exclude_files:
# Linearizable read concern is not supported for transactions.
- jstests/core/txns/**/*.js
+ - jstests/core/write_change_stream_pit_preimage_in_transaction.js
+
# These tests use benchRun(), which isn't configured to use the overridden writeConcern.
- jstests/core/bench_test*.js
- jstests/core/benchrun_pipeline_updates.js # benchRun() used for writes
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
index 1e7dfcc243f..1e2cc5ef128 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
@@ -34,6 +34,7 @@ selector:
- jstests/core/top.js
# Change stream pre-images are not cloned during initial sync.
- jstests/core/write_change_stream_pit_preimage.js
+ - jstests/core/write_change_stream_pit_preimage_in_transaction.js
exclude_with_any_tags:
- assumes_standalone_mongod
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
index 97d6ae931e1..5ad77fa692c 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
@@ -8,6 +8,7 @@ selector:
- jstests/core/views/duplicate_ns.js
# Change stream pre-images are not cloned during initial sync.
- jstests/core/write_change_stream_pit_preimage.js
+ - jstests/core/write_change_stream_pit_preimage_in_transaction.js
exclude_with_any_tags:
- assumes_standalone_mongod
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml
index a12f9273ee6..abb23b22db0 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_reconfig_jscore_passthrough.yml
@@ -11,6 +11,7 @@ selector:
# Transactions do not support retryability of individual operations.
# TODO: Remove this once it is supported (SERVER-33952).
- jstests/core/txns/**/*.js
+ - jstests/core/write_change_stream_pit_preimage_in_transaction.js
# The set_param1.js test attempts to compare the response from running the {getParameter: "*"}
# command multiple times, which may observe the change to the "transactionLifetimeLimitSeconds"
# server parameter.
diff --git a/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml
index 5848ee5bb64..ca566fe1d06 100644
--- a/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/secondary_reads_passthrough.yml
@@ -6,6 +6,7 @@ selector:
exclude_files:
# Operations within a transaction do not support write concern.
- jstests/core/txns/**/*.js
+ - jstests/core/write_change_stream_pit_preimage_in_transaction.js
# Parallel shell is not causally consistent
- jstests/core/benchrun_pipeline_updates.js
diff --git a/jstests/auth/change_stream_pre_image_coll_role_auth.js b/jstests/auth/change_stream_pre_image_coll_role_auth.js
index d13d0f85daa..688bd0f3e70 100644
--- a/jstests/auth/change_stream_pre_image_coll_role_auth.js
+++ b/jstests/auth/change_stream_pre_image_coll_role_auth.js
@@ -6,8 +6,6 @@
* requires_fcv_53,
* featureFlagChangeStreamPreAndPostImages,
* uses_change_streams,
- * # TODO SERVER-58694: remove this tag.
- * change_stream_does_not_expect_txns,
* assumes_read_preference_unchanged,
* requires_replication,
* ]
diff --git a/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js
index 645c8fef807..4e8feacada3 100644
--- a/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js
+++ b/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js
@@ -6,8 +6,6 @@
*
* @tags: [
* uses_change_streams,
- * # TODO SERVER-58694: remove this tag.
- * change_stream_does_not_expect_txns,
* ]
*/
(function() {
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js
index 963926c03bc..30f8a4658a7 100644
--- a/jstests/change_streams/lookup_pit_pre_and_post_image.js
+++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js
@@ -2,8 +2,6 @@
// with different arguments for collections with 'changeStreamPreAndPostImages' being enabled.
// @tags: [
// requires_fcv_52,
-// # TODO SERVER-58694: remove this tag.
-// change_stream_does_not_expect_txns,
// ]
(function() {
"use strict";
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js b/jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js
new file mode 100644
index 00000000000..560963f3f8c
--- /dev/null
+++ b/jstests/change_streams/lookup_pit_pre_and_post_image_in_transaction.js
@@ -0,0 +1,151 @@
+/**
+ * Tests that point-in-time pre- and post-images are retrieved for update/replace/delete operations
+ * performed in a transaction and non-atomic "applyOps" command.
+ * @tags: [
+ * requires_fcv_53,
+ * featureFlagChangeStreamPreAndPostImages,
+ * uses_transactions,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled and
+ // ChangeStreamTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
+load("jstests/libs/transactions_util.js"); // For TransactionsUtil.runInTransaction.
+
+const testDB = db.getSiblingDB(jsTestName());
+const cst = new ChangeStreamTest(testDB);
+const coll = assertDropAndRecreateCollection(
+ testDB, "coll", {changeStreamPreAndPostImages: {enabled: true}});
+const collOther = assertDropAndRecreateCollection(testDB, "coll_regular");
+
+// Verifies that change stream cursor 'changeStreamCursor' returns events defined in array
+// 'expectedEvents' in any order.
+function assertChangeEventsReturned(changeStreamCursor, expectedEvents) {
+ function toChangeEvent(event) {
+ const {_id, operationType, preImage, postImage} = event;
+ let result = {
+ documentKey: {_id},
+ ns: {db: testDB.getName(), coll: coll.getName()},
+ operationType,
+ };
+ if (preImage != undefined) {
+ result.fullDocumentBeforeChange = preImage;
+ }
+ if (postImage != undefined) {
+ result.fullDocument = postImage;
+ }
+ return result;
+ }
+ cst.assertNextChangesEqualUnordered(
+ {cursor: changeStreamCursor, expectedChanges: expectedEvents.map(toChangeEvent)});
+}
+
+assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 2, a: 1}, {_id: 3, a: 1}]));
+
+// Open a change stream on the test collection with pre- and post-images requested.
+const changeStreamCursor = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'whenAvailable'}}
+ ],
+ collection: coll
+});
+
+// Gets collections used in the test for database 'db'. In some passthroughs the collections get
+// sharded on 'getCollection()' invocation and it must happen when a transaction is not active.
+function getCollections(db) {
+ return {coll: db[coll.getName()], otherColl: db[collOther.getName()]};
+}
+
+jsTestLog("Testing a transaction consisting of a single 'applyOps' entry.");
+TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) {
+ assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}}));
+ assert.commandWorked(coll.replaceOne({_id: 2}, {a: "Long string"}));
+ assert.commandWorked(coll.deleteOne({_id: 3}));
+});
+assertChangeEventsReturned(changeStreamCursor, [
+ {_id: 1, operationType: "update", preImage: {_id: 1, a: 1}, postImage: {_id: 1, a: 2}},
+ {
+ _id: 2,
+ operationType: "replace",
+ preImage: {_id: 2, a: 1},
+ postImage: {_id: 2, a: "Long string"}
+ },
+ {_id: 3, operationType: "delete", preImage: {_id: 3, a: 1}},
+]);
+
+jsTestLog("Testing a transaction consisting of multiple 'applyOps' entries.");
+const largeStringSizeInBytes = 15 * 1024 * 1024;
+const largeString = "b".repeat(largeStringSizeInBytes);
+assert.commandWorked(coll.insert([{_id: 3, a: 1}]));
+TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) {
+ assert.commandWorked(otherColl.insert({b: largeString}));
+ assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}}));
+
+ assert.commandWorked(otherColl.insert({b: largeString}));
+ assert.commandWorked(coll.replaceOne({_id: 2}, {a: 1}));
+
+ // Issue a second modification operation on the same document within the transaction.
+ assert.commandWorked(coll.updateOne({_id: 2}, {$inc: {a: 1}}));
+
+ assert.commandWorked(coll.deleteOne({_id: 3}));
+ assert.commandWorked(otherColl.insert({b: largeString}));
+});
+assertChangeEventsReturned(changeStreamCursor, [
+ {_id: 3, operationType: "insert", postImage: {_id: 3, a: 1}},
+ {_id: 1, operationType: "update", preImage: {_id: 1, a: 2}, postImage: {_id: 1, a: 3}},
+ {
+ _id: 2,
+ operationType: "replace",
+ preImage: {_id: 2, a: "Long string"},
+ postImage: {_id: 2, a: 1}
+ },
+ {_id: 2, operationType: "update", preImage: {_id: 2, a: 1}, postImage: {_id: 2, a: 2}},
+ {_id: 3, operationType: "delete", preImage: {_id: 3, a: 1}},
+]);
+
+jsTestLog("Testing a transaction consisting of multiple 'applyOps' entries with large pre-images.");
+const largePreImageSizeInBytes = 7 * 1024 * 1024;
+const largePreImageValue = "c".repeat(largePreImageSizeInBytes);
+assert.commandWorked(coll.insert([{_id: 3, a: largePreImageValue}]));
+TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) {
+ assert.commandWorked(coll.updateOne({_id: 3}, {$set: {b: 1}}));
+ assert.commandWorked(coll.deleteOne({_id: 3}));
+});
+assertChangeEventsReturned(changeStreamCursor, [
+ {_id: 3, operationType: "insert", postImage: {_id: 3, a: largePreImageValue}},
+ {
+ _id: 3,
+ operationType: "update",
+ preImage: {_id: 3, a: largePreImageValue},
+ postImage: {_id: 3, a: largePreImageValue, b: 1}
+ },
+ {
+ _id: 3,
+ operationType: "delete",
+ preImage: {_id: 3, a: largePreImageValue, b: 1},
+ },
+]);
+
+// "applyOps" command can only be issued on a replica set.
+if (!FixtureHelpers.isMongos(testDB)) {
+ jsTestLog("Testing non-atomic 'applyOps' command.");
+ assert.commandWorked(coll.insert([{_id: 5, a: 1}, {_id: 6, a: 1}]));
+ assert.commandWorked(testDB.runCommand({
+ applyOps: [
+ {op: "u", ns: coll.getFullName(), o2: {_id: 5}, o: {$set: {a: 2}}},
+ {op: "d", ns: coll.getFullName(), o: {_id: 6}}
+ ],
+ allowAtomic: false,
+ }));
+ assertChangeEventsReturned(changeStreamCursor, [
+ {_id: 5, operationType: "insert", postImage: {_id: 5, a: 1}},
+ {_id: 6, operationType: "insert", postImage: {_id: 6, a: 1}},
+ {_id: 5, operationType: "update", preImage: {_id: 5, a: 1}, postImage: {_id: 5, a: 2}},
+ {_id: 6, operationType: "delete", preImage: {_id: 6, a: 1}},
+ ]);
+}
+})(); \ No newline at end of file
diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js
index a5549043284..7afba2009b0 100644
--- a/jstests/change_streams/lookup_pre_image.js
+++ b/jstests/change_streams/lookup_pre_image.js
@@ -6,8 +6,6 @@
* assumes_unsharded_collection,
* do_not_wrap_aggregations_in_facets,
* uses_multiple_connections,
- * # TODO SERVER-58694: remove this tag.
- * change_stream_does_not_expect_txns,
* ]
*/
(function() {
diff --git a/jstests/core/write_change_stream_pit_preimage.js b/jstests/core/write_change_stream_pit_preimage.js
index 4c463899515..758294ab37f 100644
--- a/jstests/core/write_change_stream_pit_preimage.js
+++ b/jstests/core/write_change_stream_pit_preimage.js
@@ -7,6 +7,7 @@
// requires_capped,
// requires_replication,
// requires_getmore,
+// no_selinux,
// ]
(function() {
"use strict";
diff --git a/jstests/core/write_change_stream_pit_preimage_in_transaction.js b/jstests/core/write_change_stream_pit_preimage_in_transaction.js
new file mode 100644
index 00000000000..06709dc72f2
--- /dev/null
+++ b/jstests/core/write_change_stream_pit_preimage_in_transaction.js
@@ -0,0 +1,166 @@
+/**
+ * Tests that pre-images are written to the pre-images collection on updates and deletes in
+ * transactions and for non-atomic "applyOps" command.
+ * @tags: [
+ * requires_fcv_53,
+ * featureFlagChangeStreamPreAndPostImages,
+ * assumes_against_mongod_not_mongos,
+ * requires_capped,
+ * requires_replication,
+ * requires_getmore,
+ * uses_transactions,
+ * no_selinux,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers.prepareTransaction.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isReplSet().
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load(
+ "jstests/libs/change_stream_util.js"); // For
+ // assertChangeStreamPreAndPostImagesCollectionOptionIsEnabled,
+ // assertChangeStreamPreAndPostImagesCollectionOptionIsAbsent,
+ // preImagesForOps.
+load("jstests/libs/transactions_util.js"); // For TransactionsUtil.runInTransaction.
+
+// TODO SERVER-63272: remove this check.
+if (!FixtureHelpers.isReplSet(db)) {
+ jsTestLog(
+ "Skipping the test as pre-images are not recorded in standalone mode and the test is designed to work with a replica set.");
+ return;
+}
+
+const testDB = db.getSiblingDB(jsTestName());
+const localDB = db.getSiblingDB("local");
+
+// Verifies that the expected pre-images are written during function 'ops' invocation.
+function assertPreImagesWrittenForOps(db, ops, expectedPreImages) {
+ const writtenPreImages = preImagesForOps(db, ops);
+ assert.eq(
+ expectedPreImages.length,
+ writtenPreImages.length,
+ `Expected pre-image documents: ${tojson(expectedPreImages)}. Found pre-image documents: ${
+ tojson(writtenPreImages)}.`);
+
+ for (let idx = 0; idx < writtenPreImages.length; idx++) {
+ assert.eq(writtenPreImages[idx].preImage, expectedPreImages[idx]);
+ assertValidChangeStreamPreImageDocument(writtenPreImages[idx]);
+ }
+}
+
+// Cross-checks the content of the pre-image document 'preImage' against the associated oplog entry.
+function assertValidChangeStreamPreImageDocument(preImage) {
+ function assertChangeStreamPreImageDocumentMatchesOplogEntry(oplogEntry, preImage, wallTime) {
+ // Pre-images documents are recorded only for update and delete commands.
+ assert.contains(oplogEntry.op, ["u", "d"], oplogEntry);
+ assert.eq(preImage._id.nsUUID, oplogEntry.ui, oplogEntry);
+ assert.eq(preImage.operationTime, wallTime, oplogEntry);
+ if (oplogEntry.hasOwnProperty("o2")) {
+ assert.eq(preImage.preImage._id, oplogEntry.o2._id, oplogEntry);
+ }
+ }
+ const oplogEntryCursor = localDB.oplog.rs.find({ts: preImage._id.ts});
+ assert(oplogEntryCursor.hasNext());
+ const oplogEntry = oplogEntryCursor.next();
+ if (oplogEntry.o.hasOwnProperty("applyOps")) {
+ const applyOpsOplogEntry = oplogEntry;
+ assert(preImage._id.applyOpsIndex < applyOpsOplogEntry.o.applyOps.length);
+ const applyOpsEntry = applyOpsOplogEntry.o.applyOps[preImage._id.applyOpsIndex.toNumber()];
+ assertChangeStreamPreImageDocumentMatchesOplogEntry(
+ applyOpsEntry, preImage, applyOpsOplogEntry.wall);
+ } else {
+ assert.eq(preImage._id.applyOpsIndex,
+ 0,
+ "applyOpsIndex value greater than 0 not expected for non-applyOps oplog entries");
+ assertChangeStreamPreImageDocumentMatchesOplogEntry(oplogEntry, preImage, oplogEntry.wall);
+ }
+}
+
+const coll = assertDropAndRecreateCollection(
+ testDB, "coll", {changeStreamPreAndPostImages: {enabled: true}});
+const otherColl = assertDropAndRecreateCollection(testDB, "coll_regular");
+
+// Gets collections used in the test for database 'db'. In some passthroughs the collections get
+// sharded on 'getCollection()' invocation and it must happen when a transaction is not active.
+function getCollections(db) {
+ return {coll: db[coll.getName()], otherColl: db[otherColl.getName()]};
+}
+
+// Tests the pre-image writing behavior in a transaction.
+(function testPreImageWritingInTransaction() {
+ // Verify that the pre-images are written correctly for a transaction with update and delete
+ // operations consisting of a single "applyOps" entry.
+ assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 2, a: 1}, {_id: 3, a: 1}]));
+ assert.commandWorked(otherColl.insert([{_id: 1, a: 1}]));
+ assertPreImagesWrittenForOps(db, function() {
+ TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) {
+ assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}}));
+ assert.commandWorked(otherColl.updateOne({_id: 1}, {$inc: {a: 1}}));
+ assert.commandWorked(coll.updateOne({_id: 2}, {$inc: {a: 1}}));
+ assert.commandWorked(coll.deleteOne({_id: 3}));
+ });
+ }, [{_id: 1, a: 1}, {_id: 2, a: 1}, {_id: 3, a: 1}]);
+
+ // Verify that the pre-images are written correctly for a transaction with update and delete
+ // operations consisting of multiple "applyOps" entries.
+ const stringSizeInBytes = 15 * 1024 * 1024;
+ const largeString = "b".repeat(stringSizeInBytes);
+ assert.commandWorked(coll.insert([{_id: 3, a: 1}]));
+ assertPreImagesWrittenForOps(db, function() {
+ TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, otherColl}) {
+ assert.commandWorked(otherColl.insert({b: largeString}));
+ assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}}));
+
+ assert.commandWorked(otherColl.insert({b: largeString}));
+ assert.commandWorked(coll.updateOne({_id: 2}, {$inc: {a: 1}}));
+ assert.commandWorked(coll.deleteOne({_id: 3}));
+ });
+ }, [{_id: 1, a: 2}, {_id: 2, a: 2}, {_id: 3, a: 1}]);
+
+ // Verify that large pre-images are written correctly for a transaction.
+ assert.commandWorked(coll.insert([{_id: 3, a: largeString}]));
+ assertPreImagesWrittenForOps(db, function() {
+ TransactionsUtil.runInTransaction(testDB, getCollections, function(db, {coll, _}) {
+ assert.commandWorked(coll.updateOne({_id: 1}, {$set: {b: largeString}}));
+ assert.commandWorked(coll.deleteOne({_id: 3}));
+ assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {a: 1}}));
+ });
+ }, [{_id: 1, a: 3}, {_id: 3, a: largeString}, {_id: 1, a: 3, b: largeString}]);
+})();
+
+(function testPreImageWritingForApplyOpsCommand() {
+ assert.commandWorked(coll.deleteMany({}));
+ assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 2, a: 1}]));
+
+ // Verify that pre-images are written correctly for the non-atomic "applyOps" command.
+ assertPreImagesWrittenForOps(db, function() {
+ assert.commandWorked(testDB.runCommand({
+ applyOps: [
+ {op: "u", ns: coll.getFullName(), o2: {_id: 1}, o: {$set: {a: 2}}},
+ {op: "d", ns: coll.getFullName(), o: {_id: 2}}
+ ],
+ allowAtomic: false,
+ }));
+ }, [{_id: 1, a: 1}, {_id: 2, a: 1}]);
+})();
+
+(function testPreImageWritingForPreparedTransaction() {
+ assert.commandWorked(coll.deleteMany({}));
+ assert.commandWorked(coll.insert([{_id: 1, a: 1}, {_id: 3, a: 1}]));
+
+ // Verify that pre-images are written correctly for a transaction that is prepared and then
+ // committed.
+ assertPreImagesWrittenForOps(db, function() {
+ const session = db.getMongo().startSession();
+ const sessionDb = session.getDatabase(jsTestName());
+ session.startTransaction({readConcern: {level: "majority"}});
+ const collInner = sessionDb[coll.getName()];
+ assert.commandWorked(collInner.updateOne({_id: 1}, {$inc: {a: 1}}));
+ assert.commandWorked(collInner.deleteOne({_id: 3}));
+ let prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+ assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
+ }, [{_id: 1, a: 1}, {_id: 3, a: 1}]);
+})();
+}());
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 73350adbad9..befdc82aa46 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -103,6 +103,9 @@ function canonicalizeEventForTesting(event, expected) {
if (!expected.hasOwnProperty("lsid"))
delete event.lsid;
+ if (!expected.hasOwnProperty("updateDescription"))
+ delete event.updateDescription;
+
// TODO SERVER-50301: The 'truncatedArrays' field may not appear in the updateDescription
// depending on whether $v:2 update oplog entries are enabled. When the expected event has an
// empty 'truncatedFields' we do not require that the actual event contain the field. This
@@ -550,6 +553,9 @@ function assertInvalidChangeStreamNss(dbName, collName = "test", options) {
res, [ErrorCodes.InvalidNamespace, ErrorCodes.InvalidOptions]));
}
+const kPreImagesCollectionDatabase = "config";
+const kPreImagesCollectionName = "system.preimages";
+
/**
* Asserts that 'changeStreamPreAndPostImages' collection option is present and is enabled for
* collection.
@@ -569,22 +575,50 @@ function assertChangeStreamPreAndPostImagesCollectionOptionIsAbsent(db, collName
// Returns the pre-images written while performing the write operations.
function preImagesForOps(db, writeOps) {
- const preImagesColl = db.getSiblingDB('config').getCollection("system.preimages");
- const numberOfPreImagesBefore = preImagesColl.find().itcount();
+ const preImagesColl =
+ db.getSiblingDB(kPreImagesCollectionDatabase).getCollection(kPreImagesCollectionName);
+ const preImagesCollSortSpec = {"_id.ts": 1, "_id.applyOpsIndex": 1};
+
+ // Determine the id of the last pre-image document written to be able to determine the pre-image
+ // documents written by 'writeOps()'. The pre-image purging job may concurrently remove some
+ // pre-image documents while this function is executing.
+ const preImageIdsBefore =
+ preImagesColl.find({}, {}).sort(preImagesCollSortSpec).allowDiskUse().toArray();
+ const lastPreImageId = (preImageIdsBefore.length > 0)
+ ? preImageIdsBefore[preImageIdsBefore.length - 1]._id
+ : undefined;
// Perform the write operations.
writeOps();
// Return only newly written pre-images.
- return preImagesColl.find()
+ const preImageFilter = lastPreImageId ? {"_id.ts": {$gt: lastPreImageId.ts}} : {};
+ const result =
+ preImagesColl.find(preImageFilter).sort(preImagesCollSortSpec).allowDiskUse().toArray();
+
+ // Verify that the result is correct by checking if the last pre-image still exists. However, if
+ // no pre-image document existed before 'writeOps()' invocation, the result may be incorrect.
+ assert(lastPreImageId === undefined || preImagesColl.find({_id: lastPreImageId}).itcount() == 1,
+ "Last pre-image document has been removed by the pre-image purging job.");
+ return result;
+}
+
+/**
+ * Returns documents from the pre-images collection from 'connection' ordered by _id.ts,
+ * _id.applyOpsIndex ascending.
+ */
+function getPreImages(connection) {
+ return connection.getDB(kPreImagesCollectionDatabase)[kPreImagesCollectionName]
+ .find()
.sort({"_id.ts": 1, "_id.applyOpsIndex": 1})
- .skip(numberOfPreImagesBefore)
+ .allowDiskUse()
.toArray();
}
function findPreImagesCollectionDescriptions(db) {
- return db.getSiblingDB("config").runCommand("listCollections",
- {filter: {name: "system.preimages"}});
+ return db.getSiblingDB(kPreImagesCollectionDatabase).runCommand("listCollections", {
+ filter: {name: kPreImagesCollectionName}
+ });
}
/**
diff --git a/jstests/libs/parallelTester.js b/jstests/libs/parallelTester.js
index 86704446cd4..54738e6fccf 100644
--- a/jstests/libs/parallelTester.js
+++ b/jstests/libs/parallelTester.js
@@ -239,6 +239,10 @@ if (typeof _threadInject != "undefined") {
"collection_uuid_coll_mod.js",
"collection_uuid_rename_collection.js",
"collection_uuid_index_commands.js",
+
+ // These tests rely on no writes happening that would force oplog truncation.
+ "write_change_stream_pit_preimage_in_transaction.js",
+ "write_change_stream_pit_preimage.js",
]);
// Get files, including files in subdirectories.
diff --git a/jstests/libs/transactions_util.js b/jstests/libs/transactions_util.js
index 90b34cdefd9..5b17c51db9e 100644
--- a/jstests/libs/transactions_util.js
+++ b/jstests/libs/transactions_util.js
@@ -107,11 +107,30 @@ var TransactionsUtil = (function() {
res.errorLabels.includes('TransientTransactionError');
}
+ // Runs a function 'func()' in a transaction on database 'db'. Invokes function
+ // 'beforeTransactionFunc()' before the transaction (can be used to get references to
+ // collections etc.).
+ //
+ // Function 'beforeTransactionFunc(db, session)' accepts database in session 'db' and the
+ // session 'session'.
+ // Function 'func(db, state)' accepts database in session 'db' and an object returned by
+ // 'beforeTransactionFunc()' - 'state'.
+ // 'transactionOptions' - parameters for the transaction.
+ function runInTransaction(db, beforeTransactionFunc, func, transactionOptions = {}) {
+ const session = db.getMongo().startSession();
+ const sessionDb = session.getDatabase(db.getName());
+ const state = beforeTransactionFunc(sessionDb, session);
+ session.startTransaction(transactionOptions);
+ func(sessionDb, state);
+ session.commitTransaction_forTesting();
+ }
+
return {
commandIsNonTxnAggregation,
commandSupportsTxn,
commandTypeCanSupportTxn,
deepCopyObject,
isTransientTransactionError,
+ runInTransaction,
};
})();
diff --git a/jstests/noPassthrough/change_stream_preimages_standalone_mode.js b/jstests/noPassthrough/change_stream_preimages_standalone_mode.js
index 1d84276bbcf..46e11a6928f 100644
--- a/jstests/noPassthrough/change_stream_preimages_standalone_mode.js
+++ b/jstests/noPassthrough/change_stream_preimages_standalone_mode.js
@@ -10,8 +10,6 @@
* requires_replication,
* requires_fcv_52,
* featureFlagChangeStreamPreAndPostImages,
- * # TODO SERVER-58694: remove this tag.
- * change_stream_does_not_expect_txns,
* ]
*/
diff --git a/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js b/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js
new file mode 100644
index 00000000000..28bf2066418
--- /dev/null
+++ b/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js
@@ -0,0 +1,107 @@
+/**
+ * Tests change stream point-in-time pre-images deletion replication to secondaries when primary
+ * node state is not the same as of the secondary - the pre-image document to be deleted exists on
+ * the primary node but does not exist on the secondary.
+ *
+ * @tags: [
+ * # Change streams are only supported on WiredTiger.
+ * requires_wiredtiger,
+ * requires_fcv_53,
+ * featureFlagChangeStreamPreAndPostImages,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_util.js"); // For getPreImages().
+load("jstests/libs/fail_point_util.js");
+load('jstests/replsets/rslib.js'); // For getLatestOp, getFirstOplogEntry.
+
+const oplogSizeMB = 1;
+const replTest = new ReplSetTest({
+ name: jsTestName(),
+ nodes: [{}, {rsConfig: {priority: 0}}],
+ nodeOptions: {
+ setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})},
+ oplogSize: oplogSizeMB
+ }
+});
+replTest.startSet();
+replTest.initiate();
+const primaryNode = replTest.getPrimary();
+
+const collectionName = "coll";
+const testDB = primaryNode.getDB(jsTestName());
+
+// Create a collection with change stream pre- and post-images enabled.
+assert.commandWorked(
+ testDB.createCollection(collectionName, {changeStreamPreAndPostImages: {enabled: true}}));
+const coll = testDB[collectionName];
+
+// Insert a document for the test.
+assert.commandWorked(coll.insert({_id: 1, v: 1}, {writeConcern: {w: 2}}));
+
+// Add a new node that will perform an initial sync. Pause the initial sync process (using
+// failpoint "initialSyncHangBeforeCopyingDatabases") before copying the database to perform
+// document modifications to make the collection content more recent and create inconsistent
+// data situation during oplog application.
+const initialSyncNode = replTest.add({
+ rsConfig: {priority: 0},
+ setParameter: {'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'})}
+});
+
+// Wait until the new node starts and pauses on the fail point.
+replTest.reInitiate();
+assert.commandWorked(initialSyncNode.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCopyingDatabases",
+ timesEntered: 1,
+ maxTimeMS: 60000
+}));
+
+// Update the document on the primary node.
+assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}}));
+
+// Resume the initial sync process.
+assert.commandWorked(initialSyncNode.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}));
+
+// Wait until the initial sync process is complete and the new node becomes a fully
+// functioning secondary.
+replTest.waitForState(initialSyncNode, ReplSetTest.State.SECONDARY);
+
+// Verify that pre-images were not written during the logical initial sync. At this point the
+// pre-image collections in the nodes of the replica set are out of sync.
+let preImageDocuments = getPreImages(initialSyncNode);
+assert.eq(preImageDocuments.length, 0, preImageDocuments);
+
+// Force deletion of all pre-images and ensure that this replicates to all nodes.
+// Roll over all current oplog entries.
+const lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
+assert.neq(lastOplogEntryToBeRemoved, null);
+const largeString = 'a'.repeat(256 * 1024);
+const otherColl = primaryNode.getDB(jsTestName())["otherCollection"];
+
+// Checks if the oplog has been rolled over from the timestamp of
+// 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
+// than 'lastOplogEntryToBeRemoved'.
+function oplogIsRolledOver() {
+ return timestampCmp(lastOplogEntryToBeRemoved.ts,
+ getFirstOplogEntry(primaryNode, {readConcern: "majority"}).ts) <= 0;
+}
+
+while (!oplogIsRolledOver()) {
+ assert.commandWorked(
+ otherColl.insert({long_str: largeString}, {writeConcern: {w: "majority"}}));
+}
+
+// Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' job deletes the expired pre-images
+// (all).
+assert.soon(() => {
+ const preImages = getPreImages(primaryNode);
+ return preImages.length == 0;
+});
+
+// Verify that all nodes get in sync and do not crash.
+replTest.awaitReplication();
+replTest.stopSet();
+})();
diff --git a/jstests/replsets/change_stream_pit_pre_images.js b/jstests/replsets/change_stream_pit_pre_images.js
index 726507d0fcf..43e0af1f8b2 100644
--- a/jstests/replsets/change_stream_pit_pre_images.js
+++ b/jstests/replsets/change_stream_pit_pre_images.js
@@ -13,30 +13,24 @@
*/
(function() {
"use strict";
+load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers.prepareTransaction.
+load("jstests/libs/change_stream_util.js"); // For getPreImages().
load("jstests/libs/fail_point_util.js");
load("jstests/libs/retryable_writes_util.js");
+load("jstests/libs/transactions_util.js"); // For TransactionsUtil.runInTransaction.
const testName = jsTestName();
-const preImagesCollectionDatabase = "config";
-const preImagesCollectionName = "system.preimages";
const replTest = new ReplSetTest({
name: testName,
nodes: [{}, {rsConfig: {priority: 0}}],
- nodeOptions:
- {setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})}}
+ nodeOptions: {
+ setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})},
+ oplogSize: 1024
+ }
});
replTest.startSet();
replTest.initiate();
-// Returns documents from the pre-images collection from 'node' ordered by _id.ts, _id.applyOpsIndex
-// ascending.
-function getPreImages(node) {
- return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName]
- .find()
- .sort({"_id.ts": 1, "_id.applyOpsIndex": 1})
- .toArray();
-}
-
// Asserts that documents in the pre-images collection on the primary node are the same as on a
// secondary node.
function assertPreImagesCollectionOnPrimaryMatchesSecondary() {
@@ -84,7 +78,62 @@ for (const [collectionName, collectionOptions] of [
{_id: 5, v: 3});
}
+ function issueWriteCommandsInTransaction(testDB) {
+ assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]}));
+ assert.commandWorked(coll.insert([{_id: 6, a: 1}, {_id: 7, a: 1}, {_id: 8, a: 1}]));
+
+ const transactionOptions = {readConcern: {level: "majority"}, writeConcern: {w: 2}};
+
+ // Issue commands in a single "applyOps" transaction.
+ TransactionsUtil.runInTransaction(testDB, () => {}, function(db, state) {
+ const coll = db[collectionName];
+ assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}}));
+ assert.commandWorked(coll.replaceOne({_id: 7}, {a: "Long string"}));
+ assert.commandWorked(coll.deleteOne({_id: 8}));
+ }, transactionOptions);
+
+ // Issue commands in a multiple-"applyOps" transaction.
+ assert.commandWorked(coll.insert({_id: 8, a: 1}));
+ TransactionsUtil.runInTransaction(testDB, () => {}, function(db, state) {
+ const coll = db[collectionName];
+ const largeString = "a".repeat(15 * 1024 * 1024);
+ assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}}));
+ assert.commandWorked(coll.insert({_id: 9, a: largeString}));
+ assert.commandWorked(coll.insert(
+ {_id: 10, a: largeString})); // Should go to the second "applyOps" entry.
+ assert.commandWorked(coll.replaceOne({_id: 7}, {a: "String"}));
+ assert.commandWorked(coll.deleteOne({_id: 8}));
+ }, transactionOptions);
+
+ // Issue commands in a transaction that gets prepared before a commit.
+ assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]}));
+ assert.commandWorked(coll.insert([{_id: 6, a: 1}, {_id: 7, a: 1}, {_id: 8, a: 1}]));
+ const session = testDB.getMongo().startSession();
+ const sessionDb = session.getDatabase(testDB.getName());
+ session.startTransaction();
+ const collInner = sessionDb[coll.getName()];
+ assert.commandWorked(collInner.updateOne({_id: 6}, {$inc: {a: 1}}));
+ assert.commandWorked(collInner.replaceOne({_id: 7}, {a: "Long string"}));
+ assert.commandWorked(collInner.deleteOne({_id: 8}));
+ let prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+ assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
+ }
+
+ function issueNonAtomicApplyOpsCommand(testDB) {
+ assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 9}}, {_id: {$lte: 10}}]}));
+ assert.commandWorked(coll.insert([{_id: 9, a: 1}, {_id: 10, a: 1}]));
+ assert.commandWorked(testDB.runCommand({
+ applyOps: [
+ {op: "u", ns: coll.getFullName(), o2: {_id: 9}, o: {$set: {a: 2}}},
+ {op: "d", ns: coll.getFullName(), o: {_id: 10}}
+ ],
+ allowAtomic: false,
+ }));
+ }
+
(function testSteadyStateReplication() {
+ jsTestLog("Testing pre-image replication to secondaries.");
+
// Insert a document.
assert.commandWorked(coll.insert({_id: 1, v: 1, largeField: "AAAAAAAAAAAAAAAAAAAAAAAA"}));
@@ -106,12 +155,26 @@ for (const [collectionName, collectionOptions] of [
// Issue retryable "findAndModify" commands.
issueRetryableFindAndModifyCommands(testDB);
- // Verify that a related change stream pre-images were replicated to the secondary.
+ // Verify that related change stream pre-images were replicated to the secondary.
+ replTest.awaitReplication();
+ assertPreImagesCollectionOnPrimaryMatchesSecondary();
+
+ issueWriteCommandsInTransaction(testDB);
+
+ // Verify that related change stream pre-images were replicated to the secondary.
+ replTest.awaitReplication();
+ assertPreImagesCollectionOnPrimaryMatchesSecondary();
+
+ issueNonAtomicApplyOpsCommand(testDB);
+
+ // Verify that related change stream pre-images were replicated to the secondary.
replTest.awaitReplication();
assertPreImagesCollectionOnPrimaryMatchesSecondary();
})();
(function testInitialSync() {
+ jsTestLog("Testing pre-image replication during the logical initial sync.");
+
// Insert a document for deletion test.
assert.commandWorked(coll.insert({_id: 3, field: "A"}, {writeConcern: {w: 2}}));
@@ -139,8 +202,9 @@ for (const [collectionName, collectionOptions] of [
// Delete the document on the primary node.
assert.commandWorked(coll.deleteOne({_id: 3}, {writeConcern: {w: 2}}));
- // Issue retryable "findAndModify" commands.
issueRetryableFindAndModifyCommands(testDB);
+ issueNonAtomicApplyOpsCommand(testDB);
+ issueWriteCommandsInTransaction(testDB);
// Resume the initial sync process.
assert.commandWorked(initialSyncNode.adminCommand(
@@ -167,6 +231,8 @@ for (const [collectionName, collectionOptions] of [
})();
(function testStartupRecovery() {
+ jsTestLog("Testing pre-image writing during startup recovery.");
+
// Pause check-pointing on the primary node to ensure new pre-images are not flushed to the
// disk.
const pauseCheckpointThreadFailPoint =
@@ -180,8 +246,9 @@ for (const [collectionName, collectionOptions] of [
assert.commandWorked(coll.insert({_id: 4, field: "A"}));
assert.commandWorked(coll.deleteOne({_id: 4}, {writeConcern: {w: 2}}));
- // Issue retryable "findAndModify" commands.
issueRetryableFindAndModifyCommands(testDB);
+ issueNonAtomicApplyOpsCommand(testDB);
+ issueWriteCommandsInTransaction(testDB);
// Do an unclean shutdown of the primary node, and then restart.
replTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL});
@@ -194,6 +261,5 @@ for (const [collectionName, collectionOptions] of [
assertPreImagesCollectionOnPrimaryMatchesSecondary();
})();
}
-
replTest.stopSet();
})();
diff --git a/jstests/change_streams/change_stream_lookup_preimage_with_resharding.js b/jstests/sharding/change_streams/change_stream_lookup_preimage_with_resharding.js
index ebc33433247..b00f94aed0b 100644
--- a/jstests/change_streams/change_stream_lookup_preimage_with_resharding.js
+++ b/jstests/sharding/change_streams/change_stream_lookup_preimage_with_resharding.js
@@ -5,13 +5,8 @@
*
* @tags: [
* featureFlagChangeStreamPreAndPostImages,
- * featureFlagClusteredIndexes,
- * requires_fcv_52,
+ * requires_fcv_53,
* uses_change_streams,
- * # TODO SERVER-58694: remove this tag.
- * change_stream_does_not_expect_txns,
- * # TODO SERVER-60238: remove this tag.
- * does_not_support_causal_consistency,
* assumes_unsharded_collection,
* assumes_read_preference_unchanged,
* ]
@@ -30,10 +25,11 @@ reshardingTest.setup();
const donorShardNames = reshardingTest.donorShardNames;
const recipientShardNames = reshardingTest.recipientShardNames;
+const collectionName = "test.whileResharding";
// Create a sharded collection with 'oldShardKey' as the shard key.
const coll = reshardingTest.createShardedCollection({
- ns: "test.whileResharding",
+ ns: collectionName,
shardKeyPattern: {oldShardKey: 1},
chunks: [
{min: {oldShardKey: MinKey}, max: {oldShardKey: MaxKey}, shard: donorShardNames[0]},
@@ -82,7 +78,9 @@ assert.commandWorked(coll.getDB().runCommand(
// Insert some documents before resharding the collection so that there is data to clone.
assert.commandWorked(coll.insert([
{_id: 0, annotation: "pre-resharding-insert", oldShardKey: 0, newShardKey: 2},
- {_id: 1, annotation: "pre-resharding-insert", oldShardKey: 1, newShardKey: 3}
+ {_id: 1, annotation: "pre-resharding-insert", oldShardKey: 1, newShardKey: 3},
+ {_id: 2, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3},
+ {_id: 3, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3},
]));
// Verify that 'insert' operations does not record any pre-images.
@@ -121,6 +119,23 @@ reshardingTest.withReshardingInBackground(
assert.commandWorked(
coll.update({_id: 1}, {$set: {annotation: "during-resharding-update"}}));
assert.commandWorked(coll.remove({_id: 1}, {justOne: true}));
+
+ // Perform some operations in a transaction.
+ assert.retryNoExcept(
+ () => {
+ const session = coll.getDB().getMongo().startSession();
+ const sessionDB = session.getDatabase(coll.getDB().getName());
+ const sessionColl = sessionDB.getCollection(coll.getName());
+ session.startTransaction();
+ assert.commandWorked(sessionColl.update(
+ {_id: 2}, {$set: {annotation: "during-resharding-txn-update"}}));
+ assert.commandWorked(sessionColl.remove({_id: 3}, {justOne: true}));
+ session.commitTransaction_forTesting();
+ return true;
+ },
+ "Failed to execute a transaction while resharding was in progress",
+ 10 /*num_attempts*/,
+ 100 /*intervalMS*/);
});
// Verify that after the resharding is complete, the pre-image collection exists on the recipient
@@ -142,7 +157,9 @@ verifyPreImages(donorConn, [
{_id: 1, annotation: "pre-resharding-insert", oldShardKey: 1, newShardKey: 3},
{_id: 0, annotation: "pre-resharding-update", oldShardKey: 0, newShardKey: 2},
{_id: 1, annotation: "pre-resharding-update", oldShardKey: 1, newShardKey: 3},
- {_id: 1, annotation: "during-resharding-update", oldShardKey: 1, newShardKey: 3}
+ {_id: 1, annotation: "during-resharding-update", oldShardKey: 1, newShardKey: 3},
+ {_id: 2, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3},
+ {_id: 3, annotation: "pre-resharding-txn", oldShardKey: 1, newShardKey: 3},
]);
verifyPreImages(recipientConn,
[{_id: 0, annotation: "during-resharding-update", oldShardKey: 0, newShardKey: 2}]);
@@ -176,6 +193,13 @@ verifyChangeStreamEvents(csCursor, [
{opType: "delete", id: 1, prevAnnotation: "during-resharding-update"},
{
opType: "update",
+ id: 2,
+ prevAnnotation: "pre-resharding-txn",
+ curAnnotation: "during-resharding-txn-update"
+ },
+ {opType: "delete", id: 3, prevAnnotation: "pre-resharding-txn"},
+ {
+ opType: "update",
id: 0,
prevAnnotation: "during-resharding-update",
curAnnotation: "post-resharding-update"
diff --git a/jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js b/jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js
new file mode 100644
index 00000000000..de048fb4ed1
--- /dev/null
+++ b/jstests/sharding/internal_transactions_for_retryable_findAndModify_change_stream_pre_post_images_enabled.js
@@ -0,0 +1,23 @@
+/**
+ * Tests that retryable internal transactions for "findAndModify" commands against collection with
+ * changeStreamPreAndPostImages enabled are retryable.
+ *
+ * @tags: [
+ * requires_fcv_53,
+ * featureFlagInternalTransactions,
+ * featureFlagChangeStreamPreAndPostImages
+ * ]
+ */
+(function() {
+'use strict';
+
+load('jstests/sharding/libs/retryable_internal_transaction_test.js');
+
+const transactionTest =
+ new RetryableInternalTransactionTest({changeStreamPreAndPostImages: {enabled: true}});
+transactionTest.runTestsForAllRetryableInternalTransactionTypes(
+ transactionTest.runFindAndModifyTestsEnableImageCollection);
+transactionTest.runTestsForAllRetryableInternalTransactionTypes(
+ transactionTest.runFindAndModifyTestsDisableImageCollection);
+transactionTest.stop();
+})();
diff --git a/jstests/sharding/libs/retryable_internal_transaction_test.js b/jstests/sharding/libs/retryable_internal_transaction_test.js
index 2d3ac9d4863..327ea4ab417 100644
--- a/jstests/sharding/libs/retryable_internal_transaction_test.js
+++ b/jstests/sharding/libs/retryable_internal_transaction_test.js
@@ -31,7 +31,7 @@ function getOplogEntriesForTxnWithRetries(rs, lsid, txnNumber) {
return oplogEntries;
}
-function RetryableInternalTransactionTest() {
+function RetryableInternalTransactionTest(collectionOptions = {}) {
// Set a large oplogSize since this test runs a find command to get the oplog entries for
// every transaction that it runs including large transactions and with the default oplogSize,
// oplog reading done by the find command may not be able to keep up with the oplog truncation,
@@ -51,10 +51,9 @@ function RetryableInternalTransactionTest() {
const kDbName = "testDb";
const kCollName = "testColl";
const mongosTestDB = st.s.getDB(kDbName);
+ assert.commandWorked(mongosTestDB.createCollection(kCollName, collectionOptions));
const mongosTestColl = mongosTestDB.getCollection(kCollName);
- assert.commandWorked(mongosTestDB.createCollection(kCollName));
-
function makeSessionIdForRetryableInternalTransaction() {
return {id: UUID(), txnNumber: NumberLong(0), txnUUID: UUID()};
}
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 895845a0fde..b8a7b4f28e3 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -80,6 +80,7 @@
namespace mongo {
using repl::DurableOplogEntry;
using repl::MutableOplogEntry;
+using ChangeStreamPreImageRecordingMode = repl::ReplOperation::ChangeStreamPreImageRecordingMode;
const OperationContext::Decoration<boost::optional<repl::DocumentKey>> documentKeyDecoration =
OperationContext::declareDecoration<boost::optional<repl::DocumentKey>>();
@@ -639,6 +640,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
invariant(args.updateArgs->preImageDoc);
operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setPreImageRecordedForRetryableInternalTransaction();
if (args.retryableFindAndModifyLocation ==
RetryableFindAndModifyLocation::kSideCollection) {
operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
@@ -655,7 +657,26 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
} else if (args.updateArgs->preImageRecordingEnabledForCollection) {
invariant(args.updateArgs->preImageDoc);
+ tassert(
+ 5869402,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection);
operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kOplog);
+ }
+
+ if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection) {
+ invariant(args.updateArgs->preImageDoc);
+ tassert(
+ 5869403,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.updateArgs->preImageRecordingEnabledForCollection);
+ operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kPreImagesCollection);
}
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
@@ -813,17 +834,33 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
"Deleted document must be present for pre-image recording",
args.deletedDoc);
operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setPreImageRecordedForRetryableInternalTransaction();
if (args.retryableFindAndModifyLocation ==
RetryableFindAndModifyLocation::kSideCollection) {
operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
}
}
}
- if (args.preImageRecordingEnabledForCollection) {
+
+ if (args.changeStreamPreAndPostImagesEnabledForCollection) {
+ tassert(5869400,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ tassert(
+ 5869401,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.preImageRecordingEnabledForCollection);
+ operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kPreImagesCollection);
+ } else if (args.preImageRecordingEnabledForCollection) {
tassert(5868701,
"Deleted document must be present for pre-image recording",
args.deletedDoc);
operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kOplog);
}
operation.setDestinedRecipient(destinedRecipientDecoration(opCtx));
@@ -1251,6 +1288,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
}
namespace {
+
+/**
+ * Writes pre-images for update/replace/delete operations packed into a single "applyOps" entry to
+ * the change stream pre-images collection if required. The operations are defined by sequence
+ * ['stmtBegin', 'stmtEnd'). 'applyOpsTimestamp' and 'operationTime' are the timestamp and the wall
+ * clock time, respectively, of the "applyOps" entry. A pre-image is recorded for an operation only
+ * if pre-images are enabled for the collection the operation is issued on.
+ */
+void writeChangeStreamPreImagesForApplyOpsEntries(
+ OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>::iterator& stmtBegin,
+ const std::vector<repl::ReplOperation>::iterator& stmtEnd,
+ Timestamp applyOpsTimestamp,
+ Date_t operationTime) {
+ int64_t applyOpsIndex{0};
+ for (auto stmtIterator = stmtBegin; stmtIterator != stmtEnd; ++stmtIterator) {
+ auto& operation = *stmtIterator;
+ if (operation.isChangeStreamPreImageRecordedInPreImagesCollection() &&
+ !operation.getNss().isTemporaryReshardingCollection()) {
+ invariant(operation.getUuid());
+ invariant(!operation.getPreImage().isEmpty());
+ writeToChangeStreamPreImagesCollection(
+ opCtx,
+ ChangeStreamPreImage{
+ ChangeStreamPreImageId{*operation.getUuid(), applyOpsTimestamp, applyOpsIndex},
+ operationTime,
+ operation.getPreImage()});
+ }
+ ++applyOpsIndex;
+ }
+}
+
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
// field. Appends as many operations as possible to the array (and their corresponding statement
// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the
@@ -1448,8 +1517,11 @@ int logOplogEntriesForTransaction(
if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
for (auto& statement : *stmts) {
- if (!statement.getPreImage().isEmpty() &&
- statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage) {
+ if (statement.isChangeStreamPreImageRecordedInOplog() ||
+ (statement.isPreImageRecordedForRetryableInternalTransaction() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) {
+ invariant(!statement.getPreImage().isEmpty());
+
// Note that 'needsRetryImage' stores the image kind that needs to stored in the
// image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the
// pre-image will be written to the image collection (after all the applyOps oplog
@@ -1573,6 +1645,10 @@ int logOplogEntriesForTransaction(
prevWriteOpTime.writeOpTime.getTimestamp()};
}
+ const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp();
+ writeChangeStreamPreImagesForApplyOpsEntries(
+ opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime());
+
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
numEntriesWritten++;
@@ -1626,7 +1702,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
});
}
-} // namespace
+} // namespace
void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,
diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
index 68e2e6949f8..3bdbf3954c5 100644
--- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
@@ -44,6 +44,10 @@ namespace mongo {
void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
const ChangeStreamPreImage& preImage) {
const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace;
+ tassert(5869404,
+ str::stream() << "Invalid pre-image document applyOpsIndex: "
+ << preImage.getId().getApplyOpsIndex(),
+ preImage.getId().getApplyOpsIndex() >= 0);
// This lock acquisition can block on a stronger lock held by another operation modifying the
// pre-images collection. There are no known cases where an operation holding an exclusive lock
@@ -52,7 +56,9 @@ void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX);
UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON());
tassert(5868601,
- "Failed to insert a new document into pre-images collection",
+ str::stream() << "Failed to insert a new document into the pre-images collection: ts: "
+ << preImage.getId().getTs().toString()
+ << ", applyOpsIndex: " << preImage.getId().getApplyOpsIndex(),
!res.existing && !res.upsertedId.isEmpty());
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 1c9a6765f24..6b3b6d8539c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -187,6 +187,7 @@ public:
static constexpr StringData kLsidField = "lsid"_sd;
static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd;
static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd;
+ static constexpr StringData kApplyOpsTsField = "applyOpsTs"_sd;
static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd;
// The target namespace of a rename operation.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 85b6d203147..7fcaf663c44 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -362,6 +362,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// unwinding a transaction.
auto txnOpIndex = input[DocumentSourceChangeStream::kTxnOpIndexField];
auto applyOpsIndex = input[DocumentSourceChangeStream::kApplyOpsIndexField];
+ auto applyOpsEntryTs = input[DocumentSourceChangeStream::kApplyOpsTsField];
// Add some additional fields only relevant to transactions.
if (!txnOpIndex.missing()) {
@@ -417,10 +418,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
} else {
// Set 'kPreImageIdField' to the 'ChangeStreamPreImageId'. The DSCSAddPreImage stage
// will use the id in order to fetch the pre-image from the pre-images collection.
- const auto preImageId =
- ChangeStreamPreImageId(uuid.getUuid(),
- ts.getTimestamp(),
- applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong());
+ const auto preImageId = ChangeStreamPreImageId(
+ uuid.getUuid(),
+ applyOpsEntryTs.missing() ? ts.getTimestamp() : applyOpsEntryTs.getTimestamp(),
+ applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong());
doc.addField(DocumentSourceChangeStream::kPreImageIdField, Value(preImageId.toBSON()));
}
}
@@ -463,9 +464,10 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac
deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString());
deps->fields.insert(DocumentSourceChangeStream::kTxnOpIndexField.toString());
- if (_preImageRequested) {
+ if (_preImageRequested || _postImageRequested) {
deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString());
deps->fields.insert(DocumentSourceChangeStream::kApplyOpsIndexField.toString());
+ deps->fields.insert(DocumentSourceChangeStream::kApplyOpsTsField.toString());
}
return DepsTracker::State::EXHAUSTIVE_ALL;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index 8e51e29e7b9..cdd07aa79b9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -268,6 +268,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::TransactionO
// Initialize iterators at the beginning of the transaction.
_currentApplyOpsIt = _currentApplyOps.getArray().begin();
+ _currentApplyOpsTs = firstTimestamp.getTimestamp();
_currentApplyOpsIndex = 0;
_txnOpIndex = 0;
}
@@ -304,6 +305,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::getNextTrans
BSONType::Array == bsonOp["applyOps"].type());
_currentApplyOps = Value(bsonOp["applyOps"]);
+ _currentApplyOpsTs = applyOpsEntry.getTimestamp();
_currentApplyOpsIt = _currentApplyOps.getArray().begin();
_currentApplyOpsIndex = 0;
}
@@ -338,6 +340,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::_addRequired
// the current entry.
newDoc.addField(DocumentSourceChangeStream::kApplyOpsIndexField,
Value(static_cast<long long>(applyOpsIndex())));
+ newDoc.addField(DocumentSourceChangeStream::kApplyOpsTsField, Value(applyOpsTs()));
newDoc.addField(repl::OplogEntry::kTimestampFieldName, Value(_clusterTime));
newDoc.addField(repl::OplogEntry::kSessionIdFieldName, Value(_lsid));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
index a2659178d6b..f2b17259980 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
@@ -133,6 +133,15 @@ private:
return _currentApplyOpsIndex - 1;
}
+ /**
+ * Returns the timestamp of the "applyOps" entry containing the last operation returned by
+ * 'getNextTransactionOp()'. If 'getNextTransactionOp()' has not been called, returns the
+ * timestamp of the first "applyOps" entry in the transaction.
+ */
+ Timestamp applyOpsTs() const {
+ return _currentApplyOpsTs;
+ }
+
Timestamp clusterTime() const {
return _clusterTime;
}
@@ -194,6 +203,9 @@ private:
// The index of the next entry within the current 'applyOps' array.
size_t _currentApplyOpsIndex;
+ // The timestamp of the current 'applyOps' entry.
+ Timestamp _currentApplyOpsTs;
+
// Our current place within the entire transaction, which may consist of multiple 'applyOps'
// arrays.
size_t _txnOpIndex;
diff --git a/src/mongo/db/repl/apply_ops_command_info.cpp b/src/mongo/db/repl/apply_ops_command_info.cpp
index ced7b83b770..16057086653 100644
--- a/src/mongo/db/repl/apply_ops_command_info.cpp
+++ b/src/mongo/db/repl/apply_ops_command_info.cpp
@@ -130,6 +130,7 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
auto operationDocs = info.getOperations();
bool alwaysUpsert = info.getAlwaysUpsert() && !applyOpsOplogEntry.getTxnNumber();
+ uint64_t applyOpsIdx{0};
for (const auto& operationDoc : operationDocs) {
// Make sure that the inner ops are not malformed or over-specified.
ReplOperation::parse(IDLParserErrorContext("extractOperations"), operationDoc);
@@ -145,11 +146,19 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
builder.appendElementsUnique(topLevelDoc);
auto operation = builder.obj();
- OplogEntry oplogEntry{operation};
- if (oplogEntry.getNeedsRetryImage()) {
- oplogEntry.setTimestampForRetryImage(applyOpsOplogEntry.getTimestamp());
+ operations->emplace_back(operation);
+
+ // Preserve index of operation in the "applyOps" oplog entry, timestamp, and wall clock time
+ // of the "applyOps" entry.
+ auto& lastOperation = operations->back();
+ lastOperation.setApplyOpsIndex(applyOpsIdx);
+ lastOperation.setApplyOpsTimestamp(applyOpsOplogEntry.getTimestamp());
+ lastOperation.setApplyOpsWallClockTime(applyOpsOplogEntry.getWallClockTime());
+ ++applyOpsIdx;
+
+ if (lastOperation.getNeedsRetryImage()) {
+ lastOperation.setTimestampForRetryImage(applyOpsOplogEntry.getTimestamp());
}
- operations->emplace_back(std::move(oplogEntry));
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0030db48c13..f307741e720 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1084,10 +1084,11 @@ void writeChangeStreamPreImage(OperationContext* opCtx,
const CollectionPtr& collection,
const mongo::repl::OplogEntry& oplogEntry,
const BSONObj& preImage) {
- ChangeStreamPreImageId preImageId{
- collection->uuid(), oplogEntry.getTimestamp(), 0 /*applyOpsIndex*/};
+ ChangeStreamPreImageId preImageId{collection->uuid(),
+ oplogEntry.getTimestampForPreImage(),
+ static_cast<int64_t>(oplogEntry.getApplyOpsIndex())};
ChangeStreamPreImage preImageDocument{
- std::move(preImageId), oplogEntry.getWallClockTime(), preImage};
+ std::move(preImageId), oplogEntry.getWallClockTimeForPreImage(), preImage};
writeToChangeStreamPreImagesCollection(opCtx, preImageDocument);
}
} // namespace
@@ -1739,7 +1740,11 @@ Status applyOperation_inlock(OperationContext* opCtx,
writeChangeStreamPreImage(opCtx, collection, op, *(result.requestedPreImage));
}
- if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary) {
+ // It is legal for a delete operation on the pre-images collection to delete zero
+ // documents - pre-image collections are not guaranteed to contain the same set of
+ // documents at all times.
+ if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary &&
+ !requestNss.isChangeStreamPreImagesCollection()) {
LOGV2_WARNING(2170002,
"Applied a delete which did not delete anything in steady state "
"replication",
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index ece6da5d3ee..093c4b6620a 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -736,6 +736,37 @@ bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const {
return _entry.isSingleOplogEntryTransactionWithCommand();
}
+uint64_t OplogEntry::getApplyOpsIndex() const {
+ return _applyOpsIndex;
+}
+
+void OplogEntry::setApplyOpsIndex(uint64_t value) {
+ _applyOpsIndex = value;
+}
+
+const boost::optional<mongo::Timestamp>& OplogEntry::getApplyOpsTimestamp() const {
+ return _applyOpsTimestamp;
+}
+
+void OplogEntry::setApplyOpsTimestamp(boost::optional<mongo::Timestamp> value) {
+ _applyOpsTimestamp = value;
+}
+
+const boost::optional<mongo::Date_t>& OplogEntry::getApplyOpsWallClockTime() const {
+ return _applyOpsWallClockTime;
+}
+void OplogEntry::setApplyOpsWallClockTime(boost::optional<mongo::Date_t> value) {
+ _applyOpsWallClockTime = value;
+}
+
+mongo::Timestamp OplogEntry::getTimestampForPreImage() const {
+ return getApplyOpsTimestamp().get_value_or(getTimestamp());
+}
+
+mongo::Date_t OplogEntry::getWallClockTimeForPreImage() const {
+ return getApplyOpsWallClockTime().get_value_or(getWallClockTime());
+}
+
bool OplogEntry::isCrudOpType() const {
return _entry.isCrudOpType();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 8cfdf210ca9..29e8a0a71d9 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -68,6 +68,20 @@ constexpr auto kInitiatingSetMsg = "initiating set"_sd;
class ReplOperation : public DurableReplOperation {
public:
+ /**
+ * The way the change stream pre-images are recorded upon update/replace/delete operation.
+ */
+ enum class ChangeStreamPreImageRecordingMode {
+ // The pre-image is not recorded.
+ kOff,
+
+ // The pre-image is recorded in the change stream pre-images collection.
+ kPreImagesCollection,
+
+ // The pre-image is recorded in the oplog as a separate entry.
+ kOplog,
+ };
+
static ReplOperation parse(const IDLParserErrorContext& ctxt, const BSONObj& bsonObject) {
ReplOperation o;
o.parseProtected(ctxt, bsonObject);
@@ -109,6 +123,54 @@ public:
}
/**
+ * Returns the change stream pre-images recording mode applied for this operation.
+ */
+ ChangeStreamPreImageRecordingMode getChangeStreamPreImageRecordingMode() const {
+ return _preImageRecordingMode;
+ }
+
+ /**
+ * Sets the change stream pre-images recording mode to apply for this operation.
+ */
+ void setChangeStreamPreImageRecordingMode(ChangeStreamPreImageRecordingMode value) {
+ _preImageRecordingMode = value;
+ }
+
+ /**
+ * Returns true if the change stream pre-image is recorded in a dedicated oplog entry for this
+ * operation.
+ */
+ bool isChangeStreamPreImageRecordedInOplog() const {
+ return ReplOperation::ChangeStreamPreImageRecordingMode::kOplog ==
+ getChangeStreamPreImageRecordingMode();
+ }
+
+ /**
+ * Returns true if the change stream pre-image is recorded in the change stream pre-images
+ * collection for this operation.
+ */
+ bool isChangeStreamPreImageRecordedInPreImagesCollection() const {
+ return ReplOperation::ChangeStreamPreImageRecordingMode::kPreImagesCollection ==
+ getChangeStreamPreImageRecordingMode();
+ }
+
+ /**
+ * Returns true if the operation is in a retryable internal transaction and pre-image must be
+ * recorded for the operation.
+ */
+ bool isPreImageRecordedForRetryableInternalTransaction() const {
+ return _preImageRecordedForRetryableInternalTransaction;
+ }
+
+ /**
+ * Sets whether the operation is in a retryable internal transaction and pre-image must be
+ * recorded for the operation.
+ */
+ void setPreImageRecordedForRetryableInternalTransaction(bool value = true) {
+ _preImageRecordedForRetryableInternalTransaction = value;
+ }
+
+ /**
* Sets the statement ids for this ReplOperation to 'stmtIds' if it does not contain any
* kUninitializedStmtId (i.e. placeholder statement id).
*/
@@ -134,6 +196,14 @@ private:
// the images should be persisted.
BSONObj _fullPreImage;
BSONObj _fullPostImage;
+
+ // Change stream pre-image recording mode applied to this operation.
+ ChangeStreamPreImageRecordingMode _preImageRecordingMode{
+ ChangeStreamPreImageRecordingMode::kOff};
+
+ // Whether a pre-image must be recorded for this operation since it is in a retryable internal
+ // transaction.
+ bool _preImageRecordedForRetryableInternalTransaction{false};
};
/**
@@ -637,6 +707,46 @@ public:
bool isTerminalApplyOps() const;
bool isSingleOplogEntryTransaction() const;
bool isSingleOplogEntryTransactionWithCommand() const;
+
+ /**
+ * Returns an index of this operation in the "applyOps" entry, if the operation is packed in the
+ * "applyOps" entry. Otherwise returns 0.
+ */
+ uint64_t getApplyOpsIndex() const;
+
+ void setApplyOpsIndex(uint64_t value);
+
+ /**
+ * Returns a timestamp of the "applyOps" entry, if this operation is packed in the "applyOps"
+ * entry. Otherwise returns boost::none.
+ */
+ const boost::optional<mongo::Timestamp>& getApplyOpsTimestamp() const;
+
+ void setApplyOpsTimestamp(boost::optional<mongo::Timestamp> value);
+
+ /**
+ * Returns wall clock time of the "applyOps" entry, if this operation is packed in the
+ * "applyOps" entry. Otherwise returns boost::none.
+ */
+ const boost::optional<mongo::Date_t>& getApplyOpsWallClockTime() const;
+
+ void setApplyOpsWallClockTime(boost::optional<mongo::Date_t> value);
+
+ /**
+ * Returns a timestamp to use for recording of a change stream pre-image in the change stream
+ * pre-images collection. Returns a timestamp of the "applyOps" entry, if this operation is
+ * packed in the "applyOps" entry. Otherwise returns a timestamp of this oplog entry.
+ */
+ mongo::Timestamp getTimestampForPreImage() const;
+
+ /**
+ * Returns a wall clock time to use for recording of a change stream pre-image in the change
+ * stream pre-images collection. Returns a wall clock time of the "applyOps" entry, if this
+ * operation is packed in the "applyOps" entry. Otherwise returns a wall clock time of this
+ * oplog entry.
+ */
+ mongo::Date_t getWallClockTimeForPreImage() const;
+
bool isCrudOpType() const;
bool isUpdateOrDelete() const;
bool isIndexCommandType() const;
@@ -656,6 +766,18 @@ private:
std::shared_ptr<DurableOplogEntry> _preImageOp;
std::shared_ptr<DurableOplogEntry> _postImageOp;
+ // An index of this oplog entry in the associated "applyOps" oplog entry when this entry is
+ // extracted from an "applyOps" oplog entry. Otherwise, the index value must be 0.
+ uint64_t _applyOpsIndex{0};
+
+ // A timestamp of the associated "applyOps" oplog entry when this oplog entry is extracted from
+ // an "applyOps" oplog entry.
+ boost::optional<Timestamp> _applyOpsTimestamp{boost::none};
+
+ // Wall clock time of the associated "applyOps" oplog entry when this oplog entry is extracted
+ // from an "applyOps" oplog entry.
+ boost::optional<Date_t> _applyOpsWallClockTime{boost::none};
+
bool _isForCappedCollection = false;
// During oplog application on secondaries, oplog entries extracted from each applyOps oplog
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 21d2d8b92b9..6b656bfd97b 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1673,7 +1673,10 @@ void TransactionParticipant::Participant::addTransactionOperation(
repl::DurableOplogEntry::getDurableReplOperationSize(operation);
if (!operation.getPreImage().isEmpty()) {
p().transactionOperationBytes += operation.getPreImage().objsize();
- ++p().numberOfPrePostImagesToWrite;
+ if (operation.isChangeStreamPreImageRecordedInOplog() ||
+ operation.isPreImageRecordedForRetryableInternalTransaction()) {
+ ++p().numberOfPrePostImagesToWrite;
+ }
}
if (!operation.getPostImage().isEmpty()) {
p().transactionOperationBytes += operation.getPostImage().objsize();