summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/change_streams_multi_version_transaction.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/multiVersion/change_streams_multi_version_transaction.js')
-rw-r--r--jstests/multiVersion/change_streams_multi_version_transaction.js224
1 files changed, 111 insertions, 113 deletions
diff --git a/jstests/multiVersion/change_streams_multi_version_transaction.js b/jstests/multiVersion/change_streams_multi_version_transaction.js
index 1c2bfbd7e33..0ec28a63235 100644
--- a/jstests/multiVersion/change_streams_multi_version_transaction.js
+++ b/jstests/multiVersion/change_streams_multi_version_transaction.js
@@ -4,127 +4,125 @@
//
// @tags: [uses_change_streams, uses_transactions, requires_replication]
(function() {
- "use strict";
+"use strict";
- load("jstests/libs/feature_compatibility_version.js");
- load('jstests/multiVersion/libs/multi_rs.js'); // For upgradeSet.
- load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+load("jstests/libs/feature_compatibility_version.js");
+load('jstests/multiVersion/libs/multi_rs.js'); // For upgradeSet.
+load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
- const rst = new ReplSetTest({
- nodes: 2,
- nopeOptions: {binVersion: "last-stable"},
- });
+const rst = new ReplSetTest({
+ nodes: 2,
+ nopeOptions: {binVersion: "last-stable"},
+});
- if (!startSetIfSupportsReadMajority(rst)) {
- jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
- rst.stopSet();
- return;
- }
+if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+}
- rst.initiate();
+rst.initiate();
- const dbName = jsTestName();
- const watchedCollName = "change_stream_watched";
- const unwatchedCollName = "change_stream_unwatched";
+const dbName = jsTestName();
+const watchedCollName = "change_stream_watched";
+const unwatchedCollName = "change_stream_unwatched";
- rst.getPrimary().getDB(dbName).createCollection(watchedCollName);
- rst.getPrimary().getDB(dbName).createCollection(unwatchedCollName);
+rst.getPrimary().getDB(dbName).createCollection(watchedCollName);
+rst.getPrimary().getDB(dbName).createCollection(unwatchedCollName);
- // Calls next() on a change stream cursor 'n' times and returns an array with the results.
- function getChangeStreamResults(cursor, n) {
- let results = [];
- for (let i = 0; i < n; ++i) {
- assert.soon(() => cursor.hasNext(), "Timed out waiting for change stream result " + i);
- results.push(cursor.next());
- }
- assert(!cursor.hasNext()); // The change stream should always have exactly 'n' results.
- return results;
+// Calls next() on a change stream cursor 'n' times and returns an array with the results.
+function getChangeStreamResults(cursor, n) {
+ let results = [];
+ for (let i = 0; i < n; ++i) {
+ assert.soon(() => cursor.hasNext(), "Timed out waiting for change stream result " + i);
+ results.push(cursor.next());
}
-
- // Compare expected changes with output from a change stream, failing an assertion if they do
- // not match.
- function compareChanges(expectedChanges, observedChanges) {
- assert.eq(expectedChanges.length, observedChanges.length);
- for (let i = 0; i < expectedChanges.length; ++i) {
- assert.eq(expectedChanges[i].operationType, observedChanges[i].operationType);
- if (expectedChanges[i].hasOwnProperty("fullDocument")) {
- assert.eq(expectedChanges[i].fullDocument, observedChanges[i].fullDocument);
- }
- if (expectedChanges[i].hasOwnProperty("updateDescription")) {
- assert.eq(expectedChanges[i].updateDescription,
- observedChanges[i].updateDescription);
- }
- if (expectedChanges[i].hasOwnProperty("documentKey")) {
- assert.eq(expectedChanges[i].documentKey, observedChanges[i].documentKey);
- }
+ assert(!cursor.hasNext()); // The change stream should always have exactly 'n' results.
+ return results;
+}
+
+// Compare expected changes with output from a change stream, failing an assertion if they do
+// not match.
+function compareChanges(expectedChanges, observedChanges) {
+ assert.eq(expectedChanges.length, observedChanges.length);
+ for (let i = 0; i < expectedChanges.length; ++i) {
+ assert.eq(expectedChanges[i].operationType, observedChanges[i].operationType);
+ if (expectedChanges[i].hasOwnProperty("fullDocument")) {
+ assert.eq(expectedChanges[i].fullDocument, observedChanges[i].fullDocument);
+ }
+ if (expectedChanges[i].hasOwnProperty("updateDescription")) {
+ assert.eq(expectedChanges[i].updateDescription, observedChanges[i].updateDescription);
+ }
+ if (expectedChanges[i].hasOwnProperty("documentKey")) {
+ assert.eq(expectedChanges[i].documentKey, observedChanges[i].documentKey);
}
}
-
- // Opens a $changeStream and then performs inserts, deletes, updates both within a transaction
- // and outside the transaction. Leaves all collections empty when done.
- function performDBOps(mongod) {
- const session = mongod.startSession();
- session.startTransaction();
-
- const watchedColl = session.getDatabase(dbName)[watchedCollName];
- assert.commandWorked(watchedColl.insert({_id: 1}));
- assert.commandWorked(watchedColl.updateOne({_id: 1}, {$set: {a: 1}}));
- assert.commandWorked(watchedColl.remove({_id: 1}));
-
- const unwatchedColl = session.getDatabase(dbName)[unwatchedCollName];
- assert.commandWorked(unwatchedColl.insert({_id: 1}));
- assert.commandWorked(unwatchedColl.remove({_id: 1}));
-
- const watchedCollNoTxn = mongod.getDB(dbName)[watchedCollName];
- assert.commandWorked(watchedCollNoTxn.insert({_id: 2}));
- assert.commandWorked(watchedCollNoTxn.remove({_id: 2}));
-
- session.commitTransaction();
- }
-
- // Resume a change stream from each of the resume tokens in the 'changeStreamDocs' array and
- // verify that we always see the same set of changes.
- function resumeChangeStreamFromEachToken(mongod, changeStreamDocs, expectedChanges) {
- changeStreamDocs.forEach(function(changeDoc, i) {
- const testDB = mongod.getDB(dbName);
- const resumedCursor = testDB[watchedCollName].watch([], {resumeAfter: changeDoc._id});
-
- // Resuming from document 'i' should return all the documents from 'i' + 1 to the end of
- // the array.
- const expectedChangesAfterResumeToken = expectedChanges.slice(i + 1);
- compareChanges(
- expectedChangesAfterResumeToken,
- getChangeStreamResults(resumedCursor, expectedChangesAfterResumeToken.length));
- });
- }
-
- const expectedChanges = [
- {operationType: "insert", fullDocument: {_id: 2}},
- {operationType: "delete", documentKey: {_id: 2}},
- {operationType: "insert", fullDocument: {_id: 1}},
- {operationType: "update", updateDescription: {updatedFields: {a: 1}, removedFields: []}},
- {operationType: "delete", documentKey: {_id: 1}},
- ];
-
- // Create the original change stream, verify it gives us the changes we expect, and verify that
- // we can correctly resume from any resume token.
- const changeStreamCursor = rst.getPrimary().getDB(dbName)[watchedCollName].watch();
- performDBOps(rst.getPrimary());
- const changeStreamDocs = getChangeStreamResults(changeStreamCursor, expectedChanges.length);
- compareChanges(expectedChanges, changeStreamDocs);
- resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
-
- // Upgrade the replica set (while leaving featureCompatibilityVersion as it is) and verify that
- // we can correctly resume from any resume token.
- rst.upgradeSet({binVersion: "latest"});
- resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
-
- // Upgrade the featureCompatibilityVersion and verify that we can correctly resume from any
- // resume token.
- assert.commandWorked(
- rst.getPrimary().adminCommand({setFeatureCompatibilityVersion: latestFCV}));
- checkFCV(rst.getPrimary().getDB("admin"), latestFCV);
- resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
-
- rst.stopSet();
+}
+
+// Opens a $changeStream and then performs inserts, deletes, updates both within a transaction
+// and outside the transaction. Leaves all collections empty when done.
+function performDBOps(mongod) {
+ const session = mongod.startSession();
+ session.startTransaction();
+
+ const watchedColl = session.getDatabase(dbName)[watchedCollName];
+ assert.commandWorked(watchedColl.insert({_id: 1}));
+ assert.commandWorked(watchedColl.updateOne({_id: 1}, {$set: {a: 1}}));
+ assert.commandWorked(watchedColl.remove({_id: 1}));
+
+ const unwatchedColl = session.getDatabase(dbName)[unwatchedCollName];
+ assert.commandWorked(unwatchedColl.insert({_id: 1}));
+ assert.commandWorked(unwatchedColl.remove({_id: 1}));
+
+ const watchedCollNoTxn = mongod.getDB(dbName)[watchedCollName];
+ assert.commandWorked(watchedCollNoTxn.insert({_id: 2}));
+ assert.commandWorked(watchedCollNoTxn.remove({_id: 2}));
+
+ session.commitTransaction();
+}
+
+// Resume a change stream from each of the resume tokens in the 'changeStreamDocs' array and
+// verify that we always see the same set of changes.
+function resumeChangeStreamFromEachToken(mongod, changeStreamDocs, expectedChanges) {
+ changeStreamDocs.forEach(function(changeDoc, i) {
+ const testDB = mongod.getDB(dbName);
+ const resumedCursor = testDB[watchedCollName].watch([], {resumeAfter: changeDoc._id});
+
+ // Resuming from document 'i' should return all the documents from 'i' + 1 to the end of
+ // the array.
+ const expectedChangesAfterResumeToken = expectedChanges.slice(i + 1);
+ compareChanges(
+ expectedChangesAfterResumeToken,
+ getChangeStreamResults(resumedCursor, expectedChangesAfterResumeToken.length));
+ });
+}
+
+const expectedChanges = [
+ {operationType: "insert", fullDocument: {_id: 2}},
+ {operationType: "delete", documentKey: {_id: 2}},
+ {operationType: "insert", fullDocument: {_id: 1}},
+ {operationType: "update", updateDescription: {updatedFields: {a: 1}, removedFields: []}},
+ {operationType: "delete", documentKey: {_id: 1}},
+];
+
+// Create the original change stream, verify it gives us the changes we expect, and verify that
+// we can correctly resume from any resume token.
+const changeStreamCursor = rst.getPrimary().getDB(dbName)[watchedCollName].watch();
+performDBOps(rst.getPrimary());
+const changeStreamDocs = getChangeStreamResults(changeStreamCursor, expectedChanges.length);
+compareChanges(expectedChanges, changeStreamDocs);
+resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
+
+// Upgrade the replica set (while leaving featureCompatibilityVersion as it is) and verify that
+// we can correctly resume from any resume token.
+rst.upgradeSet({binVersion: "latest"});
+resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
+
+// Upgrade the featureCompatibilityVersion and verify that we can correctly resume from any
+// resume token.
+assert.commandWorked(rst.getPrimary().adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+checkFCV(rst.getPrimary().getDB("admin"), latestFCV);
+resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
+
+rst.stopSet();
}());