diff options
Diffstat (limited to 'jstests/noPassthrough/change_stream_transaction.js')
-rw-r--r-- | jstests/noPassthrough/change_stream_transaction.js | 511 |
1 files changed, 251 insertions, 260 deletions
diff --git a/jstests/noPassthrough/change_stream_transaction.js b/jstests/noPassthrough/change_stream_transaction.js index fb244c18366..8de51656cfa 100644 --- a/jstests/noPassthrough/change_stream_transaction.js +++ b/jstests/noPassthrough/change_stream_transaction.js @@ -8,277 +8,268 @@ * ] */ (function() { - "use strict"; - - load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers. - - const dbName = "test"; - const collName = "change_stream_transaction"; - - /** - * This test sets an internal parameter in order to force transactions with more than 4 - * operations to span multiple oplog entries, making it easier to test that scenario. - */ - const maxOpsInOplogEntry = 4; - - /** - * Asserts that the expected operation type and documentKey are found on the change stream - * cursor. Returns the change stream document. - */ - function assertWriteVisible(cursor, operationType, documentKey) { - assert.soon(() => cursor.hasNext()); - const changeDoc = cursor.next(); - assert.eq(operationType, changeDoc.operationType, changeDoc); - assert.eq(documentKey, changeDoc.documentKey, changeDoc); - return changeDoc; - } - - /** - * Asserts that the expected operation type and documentKey are found on the change stream - * cursor. Pushes the corresponding resume token and change stream document to an array. - */ - function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) { - const changeDoc = assertWriteVisible(cursor, operationType, documentKey); - changeList.push(changeDoc); - } - - /** - * Asserts that there are no changes waiting on the change stream cursor. - */ - function assertNoChanges(cursor) { - assert(!cursor.hasNext(), () => { - return "Unexpected change set: " + tojson(cursor.toArray()); - }); - } - - function runTest(conn) { - const db = conn.getDB(dbName); - const coll = db.getCollection(collName); - const unwatchedColl = db.getCollection(collName + "_unwatched"); - let changeList = []; - - // Collections must be created outside of any transaction. - assert.commandWorked(db.createCollection(coll.getName())); - assert.commandWorked(db.createCollection(unwatchedColl.getName())); - - // - // Start transaction 1. - // - const session1 = db.getMongo().startSession(); - const sessionDb1 = session1.getDatabase(dbName); - const sessionColl1 = sessionDb1[collName]; - session1.startTransaction({readConcern: {level: "majority"}}); - - // - // Start transaction 2. - // - const session2 = db.getMongo().startSession(); - const sessionDb2 = session2.getDatabase(dbName); - const sessionColl2 = sessionDb2[collName]; - session2.startTransaction({readConcern: {level: "majority"}}); - - // - // Start transaction 3. - // - const session3 = db.getMongo().startSession(); - const sessionDb3 = session3.getDatabase(dbName); - const sessionColl3 = sessionDb3[collName]; - session3.startTransaction({readConcern: {level: "majority"}}); - - // Open a change stream on the test collection. - const changeStreamCursor = coll.watch(); - - // Insert a document and confirm that the change stream has it. - assert.commandWorked(coll.insert({_id: "no-txn-doc-1"}, {writeConcern: {w: "majority"}})); - assertWriteVisibleWithCapture( - changeStreamCursor, "insert", {_id: "no-txn-doc-1"}, changeList); - - // Insert two documents under each transaction and confirm no change stream updates. - assert.commandWorked(sessionColl1.insert([{_id: "txn1-doc-1"}, {_id: "txn1-doc-2"}])); - assert.commandWorked(sessionColl2.insert([{_id: "txn2-doc-1"}, {_id: "txn2-doc-2"}])); - assertNoChanges(changeStreamCursor); - - // Update one document under each transaction and confirm no change stream updates. - assert.commandWorked(sessionColl1.update({_id: "txn1-doc-1"}, {$set: {"updated": 1}})); - assert.commandWorked(sessionColl2.update({_id: "txn2-doc-1"}, {$set: {"updated": 1}})); - assertNoChanges(changeStreamCursor); +"use strict"; - // Update and then remove the second doc under each transaction and confirm no change stream - // events are seen. - assert.commandWorked( - sessionColl1.update({_id: "txn1-doc-2"}, {$set: {"update-before-delete": 1}})); - assert.commandWorked( - sessionColl2.update({_id: "txn2-doc-2"}, {$set: {"update-before-delete": 1}})); - assert.commandWorked(sessionColl1.remove({_id: "txn1-doc-2"})); - assert.commandWorked(sessionColl2.remove({_id: "txn2-doc-2"})); - assertNoChanges(changeStreamCursor); +load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers. - // Perform a write to the 'session1' transaction in a collection that is not being watched - // by 'changeStreamCursor'. We do not expect to see this write in the change stream either - // now or on commit. - assert.commandWorked( - sessionDb1[unwatchedColl.getName()].insert({_id: "txn1-doc-unwatched-collection"})); - assertNoChanges(changeStreamCursor); +const dbName = "test"; +const collName = "change_stream_transaction"; - // Perform a write to the 'session3' transaction in a collection that is not being watched - // by 'changeStreamCursor'. We do not expect to see this write in the change stream either - // now or on commit. - assert.commandWorked( - sessionDb3[unwatchedColl.getName()].insert({_id: "txn3-doc-unwatched-collection"})); - assertNoChanges(changeStreamCursor); - - // Perform a write outside of a transaction and confirm that the change stream sees only - // this write. - assert.commandWorked(coll.insert({_id: "no-txn-doc-2"}, {writeConcern: {w: "majority"}})); - assertWriteVisibleWithCapture( - changeStreamCursor, "insert", {_id: "no-txn-doc-2"}, changeList); - assertNoChanges(changeStreamCursor); - - let prepareTimestampTxn1; - prepareTimestampTxn1 = PrepareHelpers.prepareTransaction(session1); - assertNoChanges(changeStreamCursor); - - assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: "majority"}})); - assertWriteVisibleWithCapture( - changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList); - - // - // Commit first transaction and confirm expected changes. - // - assert.commandWorked(PrepareHelpers.commitTransaction(session1, prepareTimestampTxn1)); - assertWriteVisibleWithCapture( - changeStreamCursor, "insert", {_id: "txn1-doc-1"}, changeList); - assertWriteVisibleWithCapture( - changeStreamCursor, "insert", {_id: "txn1-doc-2"}, changeList); - assertWriteVisibleWithCapture( - changeStreamCursor, "update", {_id: "txn1-doc-1"}, changeList); - assertWriteVisibleWithCapture( - changeStreamCursor, "update", {_id: "txn1-doc-2"}, changeList); - assertWriteVisibleWithCapture( - changeStreamCursor, "delete", {_id: "txn1-doc-2"}, changeList); - assertNoChanges(changeStreamCursor); - - // Transition the second transaction to prepared. We skip capturing the prepare - // timestamp it is not required for abortTransaction_forTesting(). - PrepareHelpers.prepareTransaction(session2); - assertNoChanges(changeStreamCursor); +/** + * This test sets an internal parameter in order to force transactions with more than 4 + * operations to span multiple oplog entries, making it easier to test that scenario. + */ +const maxOpsInOplogEntry = 4; - assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: "majority"}})); - assertWriteVisibleWithCapture( - changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList); +/** + * Asserts that the expected operation type and documentKey are found on the change stream + * cursor. Returns the change stream document. + */ +function assertWriteVisible(cursor, operationType, documentKey) { + assert.soon(() => cursor.hasNext()); + const changeDoc = cursor.next(); + assert.eq(operationType, changeDoc.operationType, changeDoc); + assert.eq(documentKey, changeDoc.documentKey, changeDoc); + return changeDoc; +} + +/** + * Asserts that the expected operation type and documentKey are found on the change stream + * cursor. Pushes the corresponding resume token and change stream document to an array. + */ +function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) { + const changeDoc = assertWriteVisible(cursor, operationType, documentKey); + changeList.push(changeDoc); +} - // - // Abort second transaction. - // - session2.abortTransaction_forTesting(); +/** + * Asserts that there are no changes waiting on the change stream cursor. + */ +function assertNoChanges(cursor) { + assert(!cursor.hasNext(), () => { + return "Unexpected change set: " + tojson(cursor.toArray()); + }); +} + +function runTest(conn) { + const db = conn.getDB(dbName); + const coll = db.getCollection(collName); + const unwatchedColl = db.getCollection(collName + "_unwatched"); + let changeList = []; + + // Collections must be created outside of any transaction. + assert.commandWorked(db.createCollection(coll.getName())); + assert.commandWorked(db.createCollection(unwatchedColl.getName())); + + // + // Start transaction 1. + // + const session1 = db.getMongo().startSession(); + const sessionDb1 = session1.getDatabase(dbName); + const sessionColl1 = sessionDb1[collName]; + session1.startTransaction({readConcern: {level: "majority"}}); + + // + // Start transaction 2. + // + const session2 = db.getMongo().startSession(); + const sessionDb2 = session2.getDatabase(dbName); + const sessionColl2 = sessionDb2[collName]; + session2.startTransaction({readConcern: {level: "majority"}}); + + // + // Start transaction 3. + // + const session3 = db.getMongo().startSession(); + const sessionDb3 = session3.getDatabase(dbName); + const sessionColl3 = sessionDb3[collName]; + session3.startTransaction({readConcern: {level: "majority"}}); + + // Open a change stream on the test collection. + const changeStreamCursor = coll.watch(); + + // Insert a document and confirm that the change stream has it. + assert.commandWorked(coll.insert({_id: "no-txn-doc-1"}, {writeConcern: {w: "majority"}})); + assertWriteVisibleWithCapture(changeStreamCursor, "insert", {_id: "no-txn-doc-1"}, changeList); + + // Insert two documents under each transaction and confirm no change stream updates. + assert.commandWorked(sessionColl1.insert([{_id: "txn1-doc-1"}, {_id: "txn1-doc-2"}])); + assert.commandWorked(sessionColl2.insert([{_id: "txn2-doc-1"}, {_id: "txn2-doc-2"}])); + assertNoChanges(changeStreamCursor); + + // Update one document under each transaction and confirm no change stream updates. + assert.commandWorked(sessionColl1.update({_id: "txn1-doc-1"}, {$set: {"updated": 1}})); + assert.commandWorked(sessionColl2.update({_id: "txn2-doc-1"}, {$set: {"updated": 1}})); + assertNoChanges(changeStreamCursor); + + // Update and then remove the second doc under each transaction and confirm no change stream + // events are seen. + assert.commandWorked( + sessionColl1.update({_id: "txn1-doc-2"}, {$set: {"update-before-delete": 1}})); + assert.commandWorked( + sessionColl2.update({_id: "txn2-doc-2"}, {$set: {"update-before-delete": 1}})); + assert.commandWorked(sessionColl1.remove({_id: "txn1-doc-2"})); + assert.commandWorked(sessionColl2.remove({_id: "txn2-doc-2"})); + assertNoChanges(changeStreamCursor); + + // Perform a write to the 'session1' transaction in a collection that is not being watched + // by 'changeStreamCursor'. We do not expect to see this write in the change stream either + // now or on commit. + assert.commandWorked( + sessionDb1[unwatchedColl.getName()].insert({_id: "txn1-doc-unwatched-collection"})); + assertNoChanges(changeStreamCursor); + + // Perform a write to the 'session3' transaction in a collection that is not being watched + // by 'changeStreamCursor'. We do not expect to see this write in the change stream either + // now or on commit. + assert.commandWorked( + sessionDb3[unwatchedColl.getName()].insert({_id: "txn3-doc-unwatched-collection"})); + assertNoChanges(changeStreamCursor); + + // Perform a write outside of a transaction and confirm that the change stream sees only + // this write. + assert.commandWorked(coll.insert({_id: "no-txn-doc-2"}, {writeConcern: {w: "majority"}})); + assertWriteVisibleWithCapture(changeStreamCursor, "insert", {_id: "no-txn-doc-2"}, changeList); + assertNoChanges(changeStreamCursor); + + let prepareTimestampTxn1; + prepareTimestampTxn1 = PrepareHelpers.prepareTransaction(session1); + assertNoChanges(changeStreamCursor); + + assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: "majority"}})); + assertWriteVisibleWithCapture(changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList); + + // + // Commit first transaction and confirm expected changes. + // + assert.commandWorked(PrepareHelpers.commitTransaction(session1, prepareTimestampTxn1)); + assertWriteVisibleWithCapture(changeStreamCursor, "insert", {_id: "txn1-doc-1"}, changeList); + assertWriteVisibleWithCapture(changeStreamCursor, "insert", {_id: "txn1-doc-2"}, changeList); + assertWriteVisibleWithCapture(changeStreamCursor, "update", {_id: "txn1-doc-1"}, changeList); + assertWriteVisibleWithCapture(changeStreamCursor, "update", {_id: "txn1-doc-2"}, changeList); + assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: "txn1-doc-2"}, changeList); + assertNoChanges(changeStreamCursor); + + // Transition the second transaction to prepared. We skip capturing the prepare + // timestamp it is not required for abortTransaction_forTesting(). + PrepareHelpers.prepareTransaction(session2); + assertNoChanges(changeStreamCursor); + + assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: "majority"}})); + assertWriteVisibleWithCapture(changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList); + + // + // Abort second transaction. + // + session2.abortTransaction_forTesting(); + assertNoChanges(changeStreamCursor); + + // + // Start transaction 4. + // + const session4 = db.getMongo().startSession(); + const sessionDb4 = session4.getDatabase(dbName); + const sessionColl4 = sessionDb4[collName]; + session4.startTransaction({readConcern: {level: "majority"}}); + + // Perform enough writes to fill up one applyOps. + const txn4Inserts = Array.from({length: maxOpsInOplogEntry}, + (_, index) => ({_id: {name: "txn4-doc", index: index}})); + txn4Inserts.forEach(function(doc) { + sessionColl4.insert(doc); assertNoChanges(changeStreamCursor); + }); - // - // Start transaction 4. - // - const session4 = db.getMongo().startSession(); - const sessionDb4 = session4.getDatabase(dbName); - const sessionColl4 = sessionDb4[collName]; - session4.startTransaction({readConcern: {level: "majority"}}); - - // Perform enough writes to fill up one applyOps. - const txn4Inserts = Array.from({length: maxOpsInOplogEntry}, - (_, index) => ({_id: {name: "txn4-doc", index: index}})); - txn4Inserts.forEach(function(doc) { - sessionColl4.insert(doc); - assertNoChanges(changeStreamCursor); - }); - - // Perform enough writes to an unwatched collection to fill up a second applyOps. We - // specifically want to test the case where a multi-applyOps transaction has no relevant - // updates in its final applyOps. - txn4Inserts.forEach(function(doc) { - assert.commandWorked(sessionDb4[unwatchedColl.getName()].insert(doc)); - assertNoChanges(changeStreamCursor); - }); - - // - // Start transaction 5. - // - const session5 = db.getMongo().startSession(); - const sessionDb5 = session5.getDatabase(dbName); - const sessionColl5 = sessionDb5[collName]; - session5.startTransaction({readConcern: {level: "majority"}}); - - // Perform enough writes to span 3 applyOps entries. - const txn5Inserts = Array.from({length: 3 * maxOpsInOplogEntry}, - (_, index) => ({_id: {name: "txn5-doc", index: index}})); - txn5Inserts.forEach(function(doc) { - assert.commandWorked(sessionColl5.insert(doc)); - assertNoChanges(changeStreamCursor); - }); - - // - // Prepare and commit transaction 5. - // - const prepareTimestampTxn5 = PrepareHelpers.prepareTransaction(session5); + // Perform enough writes to an unwatched collection to fill up a second applyOps. We + // specifically want to test the case where a multi-applyOps transaction has no relevant + // updates in its final applyOps. + txn4Inserts.forEach(function(doc) { + assert.commandWorked(sessionDb4[unwatchedColl.getName()].insert(doc)); assertNoChanges(changeStreamCursor); - assert.commandWorked(PrepareHelpers.commitTransaction(session5, prepareTimestampTxn5)); - txn5Inserts.forEach(function(doc) { - assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList); - }); - - // - // Commit transaction 4 without preparing. - // - session4.commitTransaction(); - txn4Inserts.forEach(function(doc) { - assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList); - }); + }); + + // + // Start transaction 5. + // + const session5 = db.getMongo().startSession(); + const sessionDb5 = session5.getDatabase(dbName); + const sessionColl5 = sessionDb5[collName]; + session5.startTransaction({readConcern: {level: "majority"}}); + + // Perform enough writes to span 3 applyOps entries. + const txn5Inserts = Array.from({length: 3 * maxOpsInOplogEntry}, + (_, index) => ({_id: {name: "txn5-doc", index: index}})); + txn5Inserts.forEach(function(doc) { + assert.commandWorked(sessionColl5.insert(doc)); assertNoChanges(changeStreamCursor); - - changeStreamCursor.close(); - - // Test that change stream resume returns the expected set of documents at each point - // captured by this test. - for (let i = 0; i < changeList.length; ++i) { - const resumeCursor = coll.watch([], {startAfter: changeList[i]._id}); - - for (let x = (i + 1); x < changeList.length; ++x) { - const expectedChangeDoc = changeList[x]; - assertWriteVisible( - resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey); - } - - assertNoChanges(resumeCursor); - resumeCursor.close(); + }); + + // + // Prepare and commit transaction 5. + // + const prepareTimestampTxn5 = PrepareHelpers.prepareTransaction(session5); + assertNoChanges(changeStreamCursor); + assert.commandWorked(PrepareHelpers.commitTransaction(session5, prepareTimestampTxn5)); + txn5Inserts.forEach(function(doc) { + assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList); + }); + + // + // Commit transaction 4 without preparing. + // + session4.commitTransaction(); + txn4Inserts.forEach(function(doc) { + assertWriteVisibleWithCapture(changeStreamCursor, "insert", doc, changeList); + }); + assertNoChanges(changeStreamCursor); + + changeStreamCursor.close(); + + // Test that change stream resume returns the expected set of documents at each point + // captured by this test. + for (let i = 0; i < changeList.length; ++i) { + const resumeCursor = coll.watch([], {startAfter: changeList[i]._id}); + + for (let x = (i + 1); x < changeList.length; ++x) { + const expectedChangeDoc = changeList[x]; + assertWriteVisible( + resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey); } - // - // Prepare and commit the third transaction and confirm that there are no visible changes. - // - let prepareTimestampTxn3; - prepareTimestampTxn3 = PrepareHelpers.prepareTransaction(session3); - assertNoChanges(changeStreamCursor); - - assert.commandWorked(PrepareHelpers.commitTransaction(session3, prepareTimestampTxn3)); - assertNoChanges(changeStreamCursor); - - assert.commandWorked(db.dropDatabase()); - } - - let replSetTestDescription = {nodes: 1}; - if (!jsTest.options().setParameters.hasOwnProperty( - "maxNumberOfTransactionOperationsInSingleOplogEntry")) { - // Configure the replica set to use our value for maxOpsInOplogEntry. - replSetTestDescription.nodeOptions = { - setParameter: {maxNumberOfTransactionOperationsInSingleOplogEntry: maxOpsInOplogEntry} - }; - } else { - // The test is executing in a build variant that already defines its own override value for - // maxNumberOfTransactionOperationsInSingleOplogEntry. Even though the build variant's - // choice for this override won't test the same edge cases, the test should still succeed. + assertNoChanges(resumeCursor); + resumeCursor.close(); } - const rst = new ReplSetTest(replSetTestDescription); - rst.startSet(); - rst.initiate(); - - runTest(rst.getPrimary()); - rst.stopSet(); + // + // Prepare and commit the third transaction and confirm that there are no visible changes. + // + let prepareTimestampTxn3; + prepareTimestampTxn3 = PrepareHelpers.prepareTransaction(session3); + assertNoChanges(changeStreamCursor); + + assert.commandWorked(PrepareHelpers.commitTransaction(session3, prepareTimestampTxn3)); + assertNoChanges(changeStreamCursor); + + assert.commandWorked(db.dropDatabase()); +} + +let replSetTestDescription = {nodes: 1}; +if (!jsTest.options().setParameters.hasOwnProperty( + "maxNumberOfTransactionOperationsInSingleOplogEntry")) { + // Configure the replica set to use our value for maxOpsInOplogEntry. + replSetTestDescription.nodeOptions = { + setParameter: {maxNumberOfTransactionOperationsInSingleOplogEntry: maxOpsInOplogEntry} + }; +} else { + // The test is executing in a build variant that already defines its own override value for + // maxNumberOfTransactionOperationsInSingleOplogEntry. Even though the build variant's + // choice for this override won't test the same edge cases, the test should still succeed. +} +const rst = new ReplSetTest(replSetTestDescription); +rst.startSet(); +rst.initiate(); + +runTest(rst.getPrimary()); + +rst.stopSet(); })(); |