summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/change_streams_feature_compatibility_version.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/multiVersion/change_streams_feature_compatibility_version.js')
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js182
1 files changed, 90 insertions, 92 deletions
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
index 23c489893e8..37c8ac7621b 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -3,103 +3,101 @@
// stream after network errors.
// @tags: [uses_change_streams]
(function() {
- "use strict";
+"use strict";
- load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
- load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet.
+load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
- const rst = new ReplSetTest({
- nodes: 2,
- nodeOptions: {binVersion: "last-stable"},
- });
+const rst = new ReplSetTest({
+ nodes: 2,
+ nodeOptions: {binVersion: "last-stable"},
+});
- if (!startSetIfSupportsReadMajority(rst)) {
- jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
- rst.stopSet();
- return;
- }
-
- rst.initiate();
-
- let testDB = rst.getPrimary().getDB(jsTestName());
- let coll = testDB.change_stream_upgrade;
-
- // Open a change stream against a 4.0 binary. We will use the resume token from this stream to
- // resume the stream once the set has been upgraded.
- let streamStartedOnOldVersion = coll.watch();
- assert.commandWorked(coll.insert({_id: "first insert, just for resume token"}));
-
- assert.soon(() => streamStartedOnOldVersion.hasNext());
- let change = streamStartedOnOldVersion.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change));
- const resumeTokenFromLastStable = change._id;
-
- assert.commandWorked(coll.insert({_id: "before binary upgrade"}));
- // Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0.
- rst.upgradeSet({binVersion: "latest"});
- testDB = rst.getPrimary().getDB(jsTestName());
- coll = testDB.change_stream_upgrade;
-
- // Test that we can resume the stream on the new binaries.
- streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
- assert.soon(() => streamStartedOnOldVersion.hasNext());
- change = streamStartedOnOldVersion.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
-
- let streamStartedOnNewVersionOldFCV = coll.watch();
-
- assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"}));
-
- let resumeTokenFromNewVersionOldFCV;
- [streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => {
- assert.soon(() => stream.hasNext());
- change = stream.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(
- change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
- if (resumeTokenFromNewVersionOldFCV === undefined) {
- resumeTokenFromNewVersionOldFCV = change._id;
- } else {
- assert.eq(resumeTokenFromNewVersionOldFCV, change._id);
- }
- });
-
- // Explicitly set feature compatibility version to 4.2.
- assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"}));
-
- const streamStartedOnNewVersion = coll.watch();
-
- // Test that we can still resume with the token from the old version. We should see the same
- // document again.
- streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
- assert.soon(() => streamStartedOnOldVersion.hasNext());
- change = streamStartedOnOldVersion.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
-
- assert.soon(() => streamStartedOnOldVersion.hasNext());
- change = streamStartedOnOldVersion.next();
+if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+}
+
+rst.initiate();
+
+let testDB = rst.getPrimary().getDB(jsTestName());
+let coll = testDB.change_stream_upgrade;
+
+// Open a change stream against a 4.0 binary. We will use the resume token from this stream to
+// resume the stream once the set has been upgraded.
+let streamStartedOnOldVersion = coll.watch();
+assert.commandWorked(coll.insert({_id: "first insert, just for resume token"}));
+
+assert.soon(() => streamStartedOnOldVersion.hasNext());
+let change = streamStartedOnOldVersion.next();
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change));
+const resumeTokenFromLastStable = change._id;
+
+assert.commandWorked(coll.insert({_id: "before binary upgrade"}));
+// Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0.
+rst.upgradeSet({binVersion: "latest"});
+testDB = rst.getPrimary().getDB(jsTestName());
+coll = testDB.change_stream_upgrade;
+
+// Test that we can resume the stream on the new binaries.
+streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
+assert.soon(() => streamStartedOnOldVersion.hasNext());
+change = streamStartedOnOldVersion.next();
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
+
+let streamStartedOnNewVersionOldFCV = coll.watch();
+
+assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"}));
+
+let resumeTokenFromNewVersionOldFCV;
+[streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => {
+ assert.soon(() => stream.hasNext());
+ change = stream.next();
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
-
- assert.commandWorked(coll.insert({_id: "after fcv upgrade"}));
- const resumedStreamOnNewVersion =
- coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV});
-
- // Test that all open streams continue to produce change events, and that the newly resumed
- // stream sees the write that just happened since it comes after the resume token used.
- for (let stream of[streamStartedOnOldVersion,
- streamStartedOnNewVersionOldFCV,
- streamStartedOnNewVersion,
- resumedStreamOnNewVersion]) {
- assert.soon(() => stream.hasNext());
- change = stream.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change));
- stream.close();
+ if (resumeTokenFromNewVersionOldFCV === undefined) {
+ resumeTokenFromNewVersionOldFCV = change._id;
+ } else {
+ assert.eq(resumeTokenFromNewVersionOldFCV, change._id);
}
+});
+
+// Explicitly set feature compatibility version to 4.2.
+assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"}));
+
+const streamStartedOnNewVersion = coll.watch();
+
+// Test that we can still resume with the token from the old version. We should see the same
+// document again.
+streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
+assert.soon(() => streamStartedOnOldVersion.hasNext());
+change = streamStartedOnOldVersion.next();
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));
+
+assert.soon(() => streamStartedOnOldVersion.hasNext());
+change = streamStartedOnOldVersion.next();
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
+
+assert.commandWorked(coll.insert({_id: "after fcv upgrade"}));
+const resumedStreamOnNewVersion = coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV});
+
+// Test that all open streams continue to produce change events, and that the newly resumed
+// stream sees the write that just happened since it comes after the resume token used.
+for (let stream of [streamStartedOnOldVersion,
+ streamStartedOnNewVersionOldFCV,
+ streamStartedOnNewVersion,
+ resumedStreamOnNewVersion]) {
+ assert.soon(() => stream.hasNext());
+ change = stream.next();
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change));
+ stream.close();
+}
- rst.stopSet();
+rst.stopSet();
}());