diff options
-rw-r--r-- | jstests/multiVersion/change_streams_multi_version_transaction.js | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/jstests/multiVersion/change_streams_multi_version_transaction.js b/jstests/multiVersion/change_streams_multi_version_transaction.js new file mode 100644 index 00000000000..1c2bfbd7e33 --- /dev/null +++ b/jstests/multiVersion/change_streams_multi_version_transaction.js @@ -0,0 +1,130 @@ +// Test that resume tokens from a replica set running the "last-stable" mongod can be used to resume +// a change stream after upgrading the replica set to the "latest" mongod, even when the change +// stream includes multi-statement transactions. +// +// @tags: [uses_change_streams, uses_transactions, requires_replication] +(function() { + "use strict"; + + load("jstests/libs/feature_compatibility_version.js"); + load('jstests/multiVersion/libs/multi_rs.js'); // For upgradeSet. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + const rst = new ReplSetTest({ + nodes: 2, + nopeOptions: {binVersion: "last-stable"}, + }); + + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + + rst.initiate(); + + const dbName = jsTestName(); + const watchedCollName = "change_stream_watched"; + const unwatchedCollName = "change_stream_unwatched"; + + rst.getPrimary().getDB(dbName).createCollection(watchedCollName); + rst.getPrimary().getDB(dbName).createCollection(unwatchedCollName); + + // Calls next() on a change stream cursor 'n' times and returns an array with the results. + function getChangeStreamResults(cursor, n) { + let results = []; + for (let i = 0; i < n; ++i) { + assert.soon(() => cursor.hasNext(), "Timed out waiting for change stream result " + i); + results.push(cursor.next()); + } + assert(!cursor.hasNext()); // The change stream should always have exactly 'n' results. + return results; + } + + // Compare expected changes with output from a change stream, failing an assertion if they do + // not match. + function compareChanges(expectedChanges, observedChanges) { + assert.eq(expectedChanges.length, observedChanges.length); + for (let i = 0; i < expectedChanges.length; ++i) { + assert.eq(expectedChanges[i].operationType, observedChanges[i].operationType); + if (expectedChanges[i].hasOwnProperty("fullDocument")) { + assert.eq(expectedChanges[i].fullDocument, observedChanges[i].fullDocument); + } + if (expectedChanges[i].hasOwnProperty("updateDescription")) { + assert.eq(expectedChanges[i].updateDescription, + observedChanges[i].updateDescription); + } + if (expectedChanges[i].hasOwnProperty("documentKey")) { + assert.eq(expectedChanges[i].documentKey, observedChanges[i].documentKey); + } + } + } + + // Opens a $changeStream and then performs inserts, deletes, updates both within a transaction + // and outside the transaction. Leaves all collections empty when done. + function performDBOps(mongod) { + const session = mongod.startSession(); + session.startTransaction(); + + const watchedColl = session.getDatabase(dbName)[watchedCollName]; + assert.commandWorked(watchedColl.insert({_id: 1})); + assert.commandWorked(watchedColl.updateOne({_id: 1}, {$set: {a: 1}})); + assert.commandWorked(watchedColl.remove({_id: 1})); + + const unwatchedColl = session.getDatabase(dbName)[unwatchedCollName]; + assert.commandWorked(unwatchedColl.insert({_id: 1})); + assert.commandWorked(unwatchedColl.remove({_id: 1})); + + const watchedCollNoTxn = mongod.getDB(dbName)[watchedCollName]; + assert.commandWorked(watchedCollNoTxn.insert({_id: 2})); + assert.commandWorked(watchedCollNoTxn.remove({_id: 2})); + + session.commitTransaction(); + } + + // Resume a change stream from each of the resume tokens in the 'changeStreamDocs' array and + // verify that we always see the same set of changes. + function resumeChangeStreamFromEachToken(mongod, changeStreamDocs, expectedChanges) { + changeStreamDocs.forEach(function(changeDoc, i) { + const testDB = mongod.getDB(dbName); + const resumedCursor = testDB[watchedCollName].watch([], {resumeAfter: changeDoc._id}); + + // Resuming from document 'i' should return all the documents from 'i' + 1 to the end of + // the array. + const expectedChangesAfterResumeToken = expectedChanges.slice(i + 1); + compareChanges( + expectedChangesAfterResumeToken, + getChangeStreamResults(resumedCursor, expectedChangesAfterResumeToken.length)); + }); + } + + const expectedChanges = [ + {operationType: "insert", fullDocument: {_id: 2}}, + {operationType: "delete", documentKey: {_id: 2}}, + {operationType: "insert", fullDocument: {_id: 1}}, + {operationType: "update", updateDescription: {updatedFields: {a: 1}, removedFields: []}}, + {operationType: "delete", documentKey: {_id: 1}}, + ]; + + // Create the original change stream, verify it gives us the changes we expect, and verify that + // we can correctly resume from any resume token. + const changeStreamCursor = rst.getPrimary().getDB(dbName)[watchedCollName].watch(); + performDBOps(rst.getPrimary()); + const changeStreamDocs = getChangeStreamResults(changeStreamCursor, expectedChanges.length); + compareChanges(expectedChanges, changeStreamDocs); + resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges); + + // Upgrade the replica set (while leaving featureCompatibilityVersion as it is) and verify that + // we can correctly resume from any resume token. + rst.upgradeSet({binVersion: "latest"}); + resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges); + + // Upgrade the featureCompatibilityVersion and verify that we can correctly resume from any + // resume token. + assert.commandWorked( + rst.getPrimary().adminCommand({setFeatureCompatibilityVersion: latestFCV})); + checkFCV(rst.getPrimary().getDB("admin"), latestFCV); + resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges); + + rst.stopSet(); +}()); |