summaryrefslogtreecommitdiff
path: root/jstests
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2020-07-23 20:55:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-03 15:45:52 +0000
commit9cfe13115e92a43d1b9273ee1d5817d548264ba7 (patch)
tree97ae08abd2668b7933c6a125f13ac700b9ae7fcd /jstests
parenta0971e8bdbb7e6015897cf3ccc3345ce38631896 (diff)
downloadmongo-9cfe13115e92a43d1b9273ee1d5817d548264ba7.tar.gz
SERVER-49067 Extend genericChangeStreams multiversion tests to use "last-lts" and "last-continuous"
Diffstat (limited to 'jstests')
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js174
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js189
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js81
-rw-r--r--jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js36
4 files changed, 258 insertions, 222 deletions
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js b/jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js
index 035f884eaae..9b9688f3143 100644
--- a/jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_feature_compatibility_version.js
@@ -8,97 +8,105 @@
load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
-const rst = new ReplSetTest({
- nodes: 2,
- nodeOptions: {binVersion: "last-lts"},
-});
-
-if (!startSetIfSupportsReadMajority(rst)) {
- jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
- rst.stopSet();
- return;
-}
-
-rst.initiate();
-
-let testDB = rst.getPrimary().getDB(jsTestName());
-let coll = testDB.change_stream_upgrade;
-
-// Open a change stream against a 4.0 binary. We will use the resume token from this stream to
-// resume the stream once the set has been upgraded.
-let streamStartedOnOldVersion = coll.watch();
-assert.commandWorked(coll.insert({_id: "first insert, just for resume token"}));
-
-assert.soon(() => streamStartedOnOldVersion.hasNext());
-let change = streamStartedOnOldVersion.next();
-assert.eq(change.operationType, "insert", tojson(change));
-assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change));
-const resumeTokenFromLastLTS = change._id;
+function runTest(downgradeVersion) {
+ jsTestLog("Running test with 'downgradeVersion': " + downgradeVersion);
+ const rst = new ReplSetTest({
+ nodes: 2,
+ nodeOptions: {binVersion: downgradeVersion},
+ });
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
-assert.commandWorked(coll.insert({_id: "before binary upgrade"}));
-// Upgrade the set to the new binary version, but keep the feature compatibility version at
-// last-lts.
-rst.upgradeSet({binVersion: "latest"});
-testDB = rst.getPrimary().getDB(jsTestName());
-coll = testDB.change_stream_upgrade;
+ rst.initiate();
-// Test that we can resume the stream on the new binaries.
-streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastLTS});
-assert.soon(() => streamStartedOnOldVersion.hasNext());
-change = streamStartedOnOldVersion.next();
-assert.eq(change.operationType, "insert", tojson(change));
-assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
+ let testDB = rst.getPrimary().getDB(jsTestName());
+ let coll = testDB.change_stream_upgrade;
-let streamStartedOnNewVersionOldFCV = coll.watch();
+ // Open a change stream against a downgraded binary. We will use the resume token from this
+ // stream to resume the stream once the set has been upgraded.
+ let streamStartedOnOldVersion = coll.watch();
+ assert.commandWorked(coll.insert({_id: "first insert, just for resume token"}));
-assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"}));
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ let change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change));
+ const resumeTokenFromDowngradeVersion = change._id;
+
+ assert.commandWorked(coll.insert({_id: "before binary upgrade"}));
+ // Upgrade the set to the latest binary version, but keep the feature compatibility version at
+ // 'downgradeVersion'.
+ rst.upgradeSet({binVersion: "latest"});
+ testDB = rst.getPrimary().getDB(jsTestName());
+ coll = testDB.change_stream_upgrade;
+
+ // Test that we can resume the stream on the new binaries.
+ streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromDowngradeVersion});
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
+
+ let streamStartedOnNewVersionOldFCV = coll.watch();
+
+ assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"}));
+
+ let resumeTokenFromNewVersionOldFCV;
+ [streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => {
+ assert.soon(() => stream.hasNext());
+ change = stream.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(
+ change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
+ if (resumeTokenFromNewVersionOldFCV === undefined) {
+ resumeTokenFromNewVersionOldFCV = change._id;
+ } else {
+ assert.eq(resumeTokenFromNewVersionOldFCV, change._id);
+ }
+ });
+
+ // Explicitly set feature compatibility version to the latest FCV.
+ assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+
+ const streamStartedOnNewVersion = coll.watch();
+
+ // Test that we can still resume with the token from the old version. We should see the same
+ // document again.
+ streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromDowngradeVersion});
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ change = streamStartedOnOldVersion.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
-let resumeTokenFromNewVersionOldFCV;
-[streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => {
- assert.soon(() => stream.hasNext());
- change = stream.next();
+ assert.soon(() => streamStartedOnOldVersion.hasNext());
+ change = streamStartedOnOldVersion.next();
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
- if (resumeTokenFromNewVersionOldFCV === undefined) {
- resumeTokenFromNewVersionOldFCV = change._id;
- } else {
- assert.eq(resumeTokenFromNewVersionOldFCV, change._id);
+
+ assert.commandWorked(coll.insert({_id: "after fcv upgrade"}));
+ const resumedStreamOnNewVersion =
+ coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV});
+
+ // Test that all open streams continue to produce change events, and that the newly resumed
+ // stream sees the write that just happened since it comes after the resume token used.
+ for (let stream of [streamStartedOnOldVersion,
+ streamStartedOnNewVersionOldFCV,
+ streamStartedOnNewVersion,
+ resumedStreamOnNewVersion]) {
+ assert.soon(() => stream.hasNext());
+ change = stream.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change));
+ stream.close();
}
-});
-
-// Explicitly set feature compatibility version to the latest FCV.
-assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
-
-const streamStartedOnNewVersion = coll.watch();
-
-// Test that we can still resume with the token from the old version. We should see the same
-// document again.
-streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastLTS});
-assert.soon(() => streamStartedOnOldVersion.hasNext());
-change = streamStartedOnOldVersion.next();
-assert.eq(change.operationType, "insert", tojson(change));
-assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
-
-assert.soon(() => streamStartedOnOldVersion.hasNext());
-change = streamStartedOnOldVersion.next();
-assert.eq(change.operationType, "insert", tojson(change));
-assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
-
-assert.commandWorked(coll.insert({_id: "after fcv upgrade"}));
-const resumedStreamOnNewVersion = coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV});
-
-// Test that all open streams continue to produce change events, and that the newly resumed
-// stream sees the write that just happened since it comes after the resume token used.
-for (let stream of [streamStartedOnOldVersion,
- streamStartedOnNewVersionOldFCV,
- streamStartedOnNewVersion,
- resumedStreamOnNewVersion]) {
- assert.soon(() => stream.hasNext());
- change = stream.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change));
- stream.close();
+
+ rst.stopSet();
}
-rst.stopSet();
+runTest("last-continuous");
+runTest("last-lts");
}());
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js
index fd39ec6a299..6281e883e8f 100644
--- a/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js
@@ -17,101 +17,108 @@ const dbName = "test";
const collName = "change_streams_multi_version_sortkey";
const namespace = dbName + "." + collName;
-// Start a sharded cluster in which all mongod and mongos processes are "last-lts" binVersion. We
-// set "writePeriodicNoops" to write to the oplog every 1 second, which ensures that test change
-// streams do not wait for longer than 1 second if one of the shards has no changes to report.
-var st = new ShardingTest({
- shards: 2,
- rs: {
- nodes: 2,
- binVersion: "last-lts",
- setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
- },
- other: {mongosOptions: {binVersion: "last-lts"}}
-});
-
-let mongosConn = st.s;
-assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).createIndex({shard: 1}));
-st.ensurePrimaryShard(dbName, st.shard0.shardName);
-
-// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
-// documents and one that contains all {shard: 2} documents.
-st.shardColl(collName,
- {shard: 1} /* Shard key */,
- {shard: 2} /* Split at */,
- {shard: 2} /* Move the chunk containing {shard: 2} to its own shard */,
- dbName,
- true /* Wait until documents orphaned by the move get deleted */);
-
-// Insert new documents on both shards, verify that each insertion outputs a result from the
-// 'changeStream' cursor, verify that the change stream results have monotonically increasing
-// timestamps, and return the resume token.
-var nextId = 0;
-function insertAndValidateChanges(coll, changeStream) {
- const docsToInsert =
- Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2, val: i}));
- nextId += docsToInsert.length;
-
- assert.commandWorked(coll.insert(docsToInsert));
-
- const changeList = [];
- assert.soon(function() {
- while (changeStream.hasNext()) {
- const change = changeStream.next();
- changeList.push(change);
+function runTest(downgradeVersion) {
+ jsTestLog("Running test with 'downgradeVersion': " + downgradeVersion);
+ // Start a sharded cluster in which all mongod and mongos processes are of the downgraded
+ // binVersion. We set "writePeriodicNoops" to write to the oplog every 1 second, which ensures
+ // that test change streams do not wait for longer than 1 second if one of the shards has no
+ // changes to report.
+ var st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 2,
+ binVersion: downgradeVersion,
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+ },
+ other: {mongosOptions: {binVersion: downgradeVersion}}
+ });
+
+ let mongosConn = st.s;
+ assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).createIndex({shard: 1}));
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+ // Shard the test collection and split it into two chunks: one that contains all {shard: 1}
+ // documents and one that contains all {shard: 2} documents.
+ st.shardColl(collName,
+ {shard: 1} /* Shard key */,
+ {shard: 2} /* Split at */,
+ {shard: 2} /* Move the chunk containing {shard: 2} to its own shard */,
+ dbName,
+ true /* Wait until documents orphaned by the move get deleted */);
+
+ // Insert new documents on both shards, verify that each insertion outputs a result from the
+ // 'changeStream' cursor, verify that the change stream results have monotonically increasing
+ // timestamps, and return the resume token.
+ var nextId = 0;
+ function insertAndValidateChanges(coll, changeStream) {
+ const docsToInsert =
+ Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2, val: i}));
+ nextId += docsToInsert.length;
+
+ assert.commandWorked(coll.insert(docsToInsert));
+
+ const changeList = [];
+ assert.soon(function() {
+ while (changeStream.hasNext()) {
+ const change = changeStream.next();
+ changeList.push(change);
+ }
+
+ return changeList.length === docsToInsert.length;
+ }, changeList);
+
+ for (let i = 0; i + 1 < changeList.length; ++i) {
+ assert(timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0,
+ "Change timestamps are not monotonically increasing: " + tojson(changeList));
}
- return changeList.length === docsToInsert.length;
- }, changeList);
-
- for (let i = 0; i + 1 < changeList.length; ++i) {
- assert(timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0,
- "Change timestamps are not monotonically increasing: " + tojson(changeList));
+ return changeStream.getResumeToken();
}
- return changeStream.getResumeToken();
+ //
+ // Open and read a change stream on the downgrade version cluster.
+ //
+ let coll = mongosConn.getDB(dbName)[collName];
+ let resumeToken = insertAndValidateChanges(coll, coll.watch());
+
+ //
+ // Upgrade the config db and the shards to the "latest" binVersion.
+ //
+ st.upgradeCluster("latest", {upgradeShards: true, upgradeConfigs: true, upgradeMongos: false});
+
+ //
+ // Open and read a change stream on the upgraded cluster but still using a downgraded version of
+ // mongos and downgraded version for the FCV.
+ //
+ resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
+
+ //
+ // Upgrade mongos to the "latest" binVersion and then open and read a change stream, this time
+ // with all cluster nodes upgraded but still in downgraded FCV.
+ //
+ st.upgradeCluster("latest", {upgradeShards: false, upgradeConfigs: false, upgradeMongos: true});
+ mongosConn = st.s;
+ coll = mongosConn.getDB(dbName)[collName];
+
+ resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
+
+ //
+ // Set the FCV to the "latest" version, and then open and read a change stream on the completely
+ // upgraded cluster.
+ //
+ assert.commandWorked(mongosConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ checkFCV(st.configRS.getPrimary().getDB("admin"), latestFCV);
+ checkFCV(st.rs0.getPrimary().getDB("admin"), latestFCV);
+ checkFCV(st.rs1.getPrimary().getDB("admin"), latestFCV);
+
+ //
+ // Open and read a change stream on the upgraded cluster.
+ //
+ resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
+
+ st.stop();
}
-//
-// Open and read a change stream on the "last-lts" cluster.
-//
-let coll = mongosConn.getDB(dbName)[collName];
-let resumeToken = insertAndValidateChanges(coll, coll.watch());
-
-//
-// Upgrade the config db and the shards to the "latest" binVersion.
-//
-st.upgradeCluster("latest", {upgradeShards: true, upgradeConfigs: true, upgradeMongos: false});
-
-//
-// Open and read a change stream on the upgraded cluster but still using a "last-lts" version of
-// mongos and "last-lts" for the FCV.
-//
-resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
-
-//
-// Upgrade mongos to the "latest" binVersion and then open and read a change stream, this time with
-// all cluster nodes upgraded but still in "last-lts" FCV.
-//
-st.upgradeCluster("latest", {upgradeShards: false, upgradeConfigs: false, upgradeMongos: true});
-mongosConn = st.s;
-coll = mongosConn.getDB(dbName)[collName];
-
-resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
-
-//
-// Set the FCV to the "latest" version, and then open and read a change stream on the completely
-// upgraded cluster.
-//
-assert.commandWorked(mongosConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
-checkFCV(st.configRS.getPrimary().getDB("admin"), latestFCV);
-checkFCV(st.rs0.getPrimary().getDB("admin"), latestFCV);
-checkFCV(st.rs1.getPrimary().getDB("admin"), latestFCV);
-
-//
-// Open and read a change stream on the upgraded cluster.
-//
-resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));
-
-st.stop();
+runTest("last-continuous");
+runTest("last-lts");
}());
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js
index 12a28dabf05..9bb9c4b8383 100644
--- a/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_transaction.js
@@ -1,4 +1,4 @@
-// Test that resume tokens from a replica set running the "last-lts" mongod can be used to resume
+// Test that resume tokens from a replica set running a downgraded mongod can be used to resume
// a change stream after upgrading the replica set to the "latest" mongod, even when the change
// stream includes multi-statement transactions.
//
@@ -9,26 +9,10 @@
load('jstests/multiVersion/libs/multi_rs.js'); // For upgradeSet.
load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
-const rst = new ReplSetTest({
- nodes: 2,
- nodeOptions: {binVersion: "last-lts"},
-});
-
-if (!startSetIfSupportsReadMajority(rst)) {
- jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
- rst.stopSet();
- return;
-}
-
-rst.initiate();
-
const dbName = jsTestName();
const watchedCollName = "change_stream_watched";
const unwatchedCollName = "change_stream_unwatched";
-rst.getPrimary().getDB(dbName).createCollection(watchedCollName);
-rst.getPrimary().getDB(dbName).createCollection(unwatchedCollName);
-
// Calls next() on a change stream cursor 'n' times and returns an array with the results.
function getChangeStreamResults(cursor, n) {
let results = [];
@@ -104,24 +88,47 @@ const expectedChanges = [
{operationType: "delete", documentKey: {_id: 1}},
];
-// Create the original change stream, verify it gives us the changes we expect, and verify that
-// we can correctly resume from any resume token.
-const changeStreamCursor = rst.getPrimary().getDB(dbName)[watchedCollName].watch();
-performDBOps(rst.getPrimary());
-const changeStreamDocs = getChangeStreamResults(changeStreamCursor, expectedChanges.length);
-compareChanges(expectedChanges, changeStreamDocs);
-resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
-
-// Upgrade the replica set (while leaving featureCompatibilityVersion as it is) and verify that
-// we can correctly resume from any resume token.
-rst.upgradeSet({binVersion: "latest"});
-resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
-
-// Upgrade the featureCompatibilityVersion and verify that we can correctly resume from any
-// resume token.
-assert.commandWorked(rst.getPrimary().adminCommand({setFeatureCompatibilityVersion: latestFCV}));
-checkFCV(rst.getPrimary().getDB("admin"), latestFCV);
-resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
-
-rst.stopSet();
+function runTest(downgradeVersion) {
+ const rst = new ReplSetTest({
+ nodes: 2,
+ nodeOptions: {binVersion: downgradeVersion},
+ });
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+
+ jsTestLog("Running test with 'downgradeVersion': " + downgradeVersion);
+ rst.initiate();
+
+ rst.getPrimary().getDB(dbName).createCollection(watchedCollName);
+ rst.getPrimary().getDB(dbName).createCollection(unwatchedCollName);
+
+ // Create the original change stream, verify it gives us the changes we expect, and verify that
+ // we can correctly resume from any resume token.
+ const changeStreamCursor = rst.getPrimary().getDB(dbName)[watchedCollName].watch();
+ performDBOps(rst.getPrimary());
+ const changeStreamDocs = getChangeStreamResults(changeStreamCursor, expectedChanges.length);
+ compareChanges(expectedChanges, changeStreamDocs);
+ resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
+
+ // Upgrade the replica set (while leaving featureCompatibilityVersion as it is) and verify that
+ // we can correctly resume from any resume token.
+ rst.upgradeSet({binVersion: "latest"});
+ resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
+
+ // Upgrade the featureCompatibilityVersion and verify that we can correctly resume from any
+ // resume token.
+ assert.commandWorked(
+ rst.getPrimary().adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ checkFCV(rst.getPrimary().getDB("admin"), latestFCV);
+ resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
+
+ rst.stopSet();
+}
+
+runTest("last-continuous");
+runTest("last-lts");
}());
diff --git a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
index 4ea15f02a62..99d5b26af74 100644
--- a/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
+++ b/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
@@ -1,6 +1,6 @@
/**
- * Verifies that a change stream which is resumed on a downgraded last-lts binary does not crash
- * the server, even when reading oplog entries which the last-lts binary may not understand.
+ * Verifies that a change stream which is resumed on a downgraded binary does not crash
+ * the server, even when reading oplog entries which the downgraded binary may not understand.
*
* @tags: [uses_change_streams, requires_replication]
*/
@@ -220,7 +220,7 @@ function writeOplogEntriesAndCreateResumePointsOnLatestVersion() {
* should be an array and each entry should have fields 'watch', 'resumeToken' and
* 'endSentinelEntry'.
*/
-function resumeStreamsOnLastLTSVersion(changeStreams) {
+function resumeStreamsOnDowngradedVersion(changeStreams) {
for (let changeStream of changeStreams) {
jsTestLog("Validating change stream for " + tojson(changeStream));
const csCursor = changeStream.watch({resumeAfter: changeStream.resumeToken});
@@ -252,16 +252,30 @@ function resumeStreamsOnLastLTSVersion(changeStreams) {
// cluster has been downgraded.
const changeStreamsToBeValidated = writeOplogEntriesAndCreateResumePointsOnLatestVersion();
-// Downgrade the entire cluster to 'last-lts' binVersion.
-assert.commandWorked(st.s.getDB(dbName).adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
-st.upgradeCluster("last-lts");
+function runTests(downgradeVersion) {
+ jsTestLog("Running test with 'downgradeVersion': " + downgradeVersion);
+ const downgradeFCV = downgradeVersion === "last-lts" ? lastLTSFCV : lastContinuousFCV;
+ // Downgrade the entire cluster to the 'downgradeVersion' binVersion.
+ assert.commandWorked(
+ st.s.getDB(dbName).adminCommand({setFeatureCompatibilityVersion: downgradeFCV}));
+ st.upgradeCluster(downgradeVersion);
-// Refresh our reference to the sharded collection post-downgrade.
-shardedColl = st.s.getDB(dbName)[collName];
+ // Refresh our reference to the sharded collection post-downgrade.
+ shardedColl = st.s.getDB(dbName)[collName];
-// Resume all the change streams that were created on latest version and validate that the change
-// stream doesn't crash the server after downgrade.
-resumeStreamsOnLastLTSVersion(changeStreamsToBeValidated);
+ // Resume all the change streams that were created on latest version and validate that the
+ // change stream doesn't crash the server after downgrade.
+ resumeStreamsOnDowngradedVersion(changeStreamsToBeValidated);
+}
+
+// Test resuming change streams after downgrading the cluster to 'last-continuous'.
+runTests('last-continuous');
+
+// Upgrade the entire cluster back to the latest version.
+st.upgradeCluster('latest', {waitUntilStable: true});
+assert.commandWorked(st.s.getDB(dbName).adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+// Test resuming change streams after downgrading the cluster to 'last-lts'.
+runTests('last-lts');
st.stop();
}());