summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_transaction.js
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2019-06-07 19:42:10 -0400
committerJustin Seyster <justin.seyster@mongodb.com>2019-06-07 19:51:30 -0400
commitbe23d64cd2aede6745d01445cdfece4b741ddcaf (patch)
treec2bd118ea66e653073c2b150ec37cd701cb80d71 /jstests/noPassthrough/change_stream_transaction.js
parentac88c6d4c085e72b6f0ae364b8d11dc604627efb (diff)
downloadmongo-be23d64cd2aede6745d01445cdfece4b741ddcaf.tar.gz
SERVER-41183 Add coverage for change streams over large transactions
Diffstat (limited to 'jstests/noPassthrough/change_stream_transaction.js')
-rw-r--r--jstests/noPassthrough/change_stream_transaction.js114
1 files changed, 95 insertions, 19 deletions
diff --git a/jstests/noPassthrough/change_stream_transaction.js b/jstests/noPassthrough/change_stream_transaction.js
index 801eab6f0a0..fb244c18366 100644
--- a/jstests/noPassthrough/change_stream_transaction.js
+++ b/jstests/noPassthrough/change_stream_transaction.js
@@ -1,6 +1,12 @@
-// Confirms that change streams only see committed operations for prepared transactions.
-// @tags: [uses_transactions,uses_change_streams,requires_majority_read_concern,
-// exclude_from_large_txns_due_to_change_streams]
+/*
+ * Confirms that change streams only see committed operations for prepared transactions.
+ * @tags: [
+ * uses_transactions,
+ * uses_change_streams,
+ * requires_majority_read_concern,
+ * uses_prepare_transaction,
+ * ]
+ */
(function() {
"use strict";
@@ -10,6 +16,12 @@
const collName = "change_stream_transaction";
/**
+ * This test sets an internal parameter in order to force transactions with more than 4
+ * operations to span multiple oplog entries, making it easier to test that scenario.
+ */
+ const maxOpsInOplogEntry = 4;
+
+ /**
* Asserts that the expected operation type and documentKey are found on the change stream
* cursor. Returns the change stream document.
*/
@@ -126,19 +138,15 @@
prepareTimestampTxn1 = PrepareHelpers.prepareTransaction(session1);
assertNoChanges(changeStreamCursor);
- // TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write
- // will block on a prepared transaction. We should also be able to move the check for
- // document existence prior to the transaction commit with this change.
- // Perform a write at writeConcern w: local.
- assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: 1}}));
+ assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: "majority"}}));
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList);
//
// Commit first transaction and confirm expected changes.
//
assert.commandWorked(PrepareHelpers.commitTransaction(session1, prepareTimestampTxn1));
assertWriteVisibleWithCapture(
- changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList);
- assertWriteVisibleWithCapture(
changeStreamCursor, "insert", {_id: "txn1-doc-1"}, changeList);
assertWriteVisibleWithCapture(
changeStreamCursor, "insert", {_id: "txn1-doc-2"}, changeList);
@@ -155,19 +163,75 @@
PrepareHelpers.prepareTransaction(session2);
assertNoChanges(changeStreamCursor);
- // TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write
- // will block on a prepared transaction. We should also be able to move the check for
- // document existence prior to the transaction abort with this change.
- // Perform a write at writeConcern w: local.
- assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: 1}}));
+ assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: "majority"}}));
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList);
//
// Abort second transaction.
//
- assert.commandWorked(session2.abortTransaction_forTesting());
- assertWriteVisibleWithCapture(
- changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList);
+ session2.abortTransaction_forTesting();
assertNoChanges(changeStreamCursor);
+
+ //
+ // Start transaction 4.
+ //
+ const session4 = db.getMongo().startSession();
+ const sessionDb4 = session4.getDatabase(dbName);
+ const sessionColl4 = sessionDb4[collName];
+ session4.startTransaction({readConcern: {level: "majority"}});
+
+ // Perform enough writes to fill up one applyOps.
+ const txn4Inserts = Array.from({length: maxOpsInOplogEntry},
+ (_, index) => ({_id: {name: "txn4-doc", index: index}}));
+ txn4Inserts.forEach(function(doc) {
+ sessionColl4.insert(doc);
+ assertNoChanges(changeStreamCursor);
+ });
+
+ // Perform enough writes to an unwatched collection to fill up a second applyOps. We
+ // specifically want to test the case where a multi-applyOps transaction has no relevant
+ // updates in its final applyOps.
+ txn4Inserts.forEach(function(doc) {
+ assert.commandWorked(sessionDb4[unwatchedColl.getName()].insert(doc));
+ assertNoChanges(changeStreamCursor);
+ });
+
+ //
+ // Start transaction 5.
+ //
+ const session5 = db.getMongo().startSession();
+ const sessionDb5 = session5.getDatabase(dbName);
+ const sessionColl5 = sessionDb5[collName];
+ session5.startTransaction({readConcern: {level: "majority"}});
+
+ // Perform enough writes to span 3 applyOps entries.
+ const txn5Inserts = Array.from({length: 3 * maxOpsInOplogEntry},
+ (_, index) => ({_id: {name: "txn5-doc", index: index}}));
+ txn5Inserts.forEach(function(doc) {
+ assert.commandWorked(sessionColl5.insert(doc));
+ assertNoChanges(changeStreamCursor);
+ });
+
+ //
+ // Prepare and commit transaction 5.
+ //
+ const prepareTimestampTxn5 = PrepareHelpers.prepareTransaction(session5);
+ assertNoChanges(changeStreamCursor);
+ assert.commandWorked(PrepareHelpers.commitTransaction(session5, prepareTimestampTxn5));
+ txn5Inserts.forEach(function(doc) {
+ assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList);
+ });
+
+ //
+ // Commit transaction 4 without preparing.
+ //
+ session4.commitTransaction();
+ txn4Inserts.forEach(function(doc) {
+ assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList);
+ });
+ assertNoChanges(changeStreamCursor);
+
changeStreamCursor.close();
// Test that change stream resume returns the expected set of documents at each point
@@ -198,7 +262,19 @@
assert.commandWorked(db.dropDatabase());
}
- const rst = new ReplSetTest({nodes: 1});
+ let replSetTestDescription = {nodes: 1};
+ if (!jsTest.options().setParameters.hasOwnProperty(
+ "maxNumberOfTransactionOperationsInSingleOplogEntry")) {
+ // Configure the replica set to use our value for maxOpsInOplogEntry.
+ replSetTestDescription.nodeOptions = {
+ setParameter: {maxNumberOfTransactionOperationsInSingleOplogEntry: maxOpsInOplogEntry}
+ };
+ } else {
+ // The test is executing in a build variant that already defines its own override value for
+ // maxNumberOfTransactionOperationsInSingleOplogEntry. Even though the build variant's
+ // choice for this override won't test the same edge cases, the test should still succeed.
+ }
+ const rst = new ReplSetTest(replSetTestDescription);
rst.startSet();
rst.initiate();