diff options
Diffstat (limited to 'jstests/multiVersion/change_streams_feature_compatibility_version.js')
-rw-r--r-- | jstests/multiVersion/change_streams_feature_compatibility_version.js | 182 |
1 files changed, 90 insertions, 92 deletions
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js index 23c489893e8..37c8ac7621b 100644 --- a/jstests/multiVersion/change_streams_feature_compatibility_version.js +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -3,103 +3,101 @@ // stream after network errors. // @tags: [uses_change_streams] (function() { - "use strict"; +"use strict"; - load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet. - load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. +load("jstests/multiVersion/libs/multi_rs.js"); // For upgradeSet. +load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. - const rst = new ReplSetTest({ - nodes: 2, - nodeOptions: {binVersion: "last-stable"}, - }); +const rst = new ReplSetTest({ + nodes: 2, + nodeOptions: {binVersion: "last-stable"}, +}); - if (!startSetIfSupportsReadMajority(rst)) { - jsTestLog("Skipping test since storage engine doesn't support majority read concern."); - rst.stopSet(); - return; - } - - rst.initiate(); - - let testDB = rst.getPrimary().getDB(jsTestName()); - let coll = testDB.change_stream_upgrade; - - // Open a change stream against a 4.0 binary. We will use the resume token from this stream to - // resume the stream once the set has been upgraded. - let streamStartedOnOldVersion = coll.watch(); - assert.commandWorked(coll.insert({_id: "first insert, just for resume token"})); - - assert.soon(() => streamStartedOnOldVersion.hasNext()); - let change = streamStartedOnOldVersion.next(); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change)); - const resumeTokenFromLastStable = change._id; - - assert.commandWorked(coll.insert({_id: "before binary upgrade"})); - // Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0. - rst.upgradeSet({binVersion: "latest"}); - testDB = rst.getPrimary().getDB(jsTestName()); - coll = testDB.change_stream_upgrade; - - // Test that we can resume the stream on the new binaries. - streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); - assert.soon(() => streamStartedOnOldVersion.hasNext()); - change = streamStartedOnOldVersion.next(); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, "before binary upgrade", tojson(change)); - - let streamStartedOnNewVersionOldFCV = coll.watch(); - - assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"})); - - let resumeTokenFromNewVersionOldFCV; - [streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => { - assert.soon(() => stream.hasNext()); - change = stream.next(); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq( - change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change)); - if (resumeTokenFromNewVersionOldFCV === undefined) { - resumeTokenFromNewVersionOldFCV = change._id; - } else { - assert.eq(resumeTokenFromNewVersionOldFCV, change._id); - } - }); - - // Explicitly set feature compatibility version to 4.2. - assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"})); - - const streamStartedOnNewVersion = coll.watch(); - - // Test that we can still resume with the token from the old version. We should see the same - // document again. - streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); - assert.soon(() => streamStartedOnOldVersion.hasNext()); - change = streamStartedOnOldVersion.next(); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, "before binary upgrade", tojson(change)); - - assert.soon(() => streamStartedOnOldVersion.hasNext()); - change = streamStartedOnOldVersion.next(); +if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; +} + +rst.initiate(); + +let testDB = rst.getPrimary().getDB(jsTestName()); +let coll = testDB.change_stream_upgrade; + +// Open a change stream against a 4.0 binary. We will use the resume token from this stream to +// resume the stream once the set has been upgraded. +let streamStartedOnOldVersion = coll.watch(); +assert.commandWorked(coll.insert({_id: "first insert, just for resume token"})); + +assert.soon(() => streamStartedOnOldVersion.hasNext()); +let change = streamStartedOnOldVersion.next(); +assert.eq(change.operationType, "insert", tojson(change)); +assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change)); +const resumeTokenFromLastStable = change._id; + +assert.commandWorked(coll.insert({_id: "before binary upgrade"})); +// Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0. +rst.upgradeSet({binVersion: "latest"}); +testDB = rst.getPrimary().getDB(jsTestName()); +coll = testDB.change_stream_upgrade; + +// Test that we can resume the stream on the new binaries. +streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); +assert.soon(() => streamStartedOnOldVersion.hasNext()); +change = streamStartedOnOldVersion.next(); +assert.eq(change.operationType, "insert", tojson(change)); +assert.eq(change.documentKey._id, "before binary upgrade", tojson(change)); + +let streamStartedOnNewVersionOldFCV = coll.watch(); + +assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"})); + +let resumeTokenFromNewVersionOldFCV; +[streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => { + assert.soon(() => stream.hasNext()); + change = stream.next(); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change)); - - assert.commandWorked(coll.insert({_id: "after fcv upgrade"})); - const resumedStreamOnNewVersion = - coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV}); - - // Test that all open streams continue to produce change events, and that the newly resumed - // stream sees the write that just happened since it comes after the resume token used. - for (let stream of[streamStartedOnOldVersion, - streamStartedOnNewVersionOldFCV, - streamStartedOnNewVersion, - resumedStreamOnNewVersion]) { - assert.soon(() => stream.hasNext()); - change = stream.next(); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change)); - stream.close(); + if (resumeTokenFromNewVersionOldFCV === undefined) { + resumeTokenFromNewVersionOldFCV = change._id; + } else { + assert.eq(resumeTokenFromNewVersionOldFCV, change._id); } +}); + +// Explicitly set feature compatibility version to 4.2. +assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"})); + +const streamStartedOnNewVersion = coll.watch(); + +// Test that we can still resume with the token from the old version. We should see the same +// document again. +streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable}); +assert.soon(() => streamStartedOnOldVersion.hasNext()); +change = streamStartedOnOldVersion.next(); +assert.eq(change.operationType, "insert", tojson(change)); +assert.eq(change.documentKey._id, "before binary upgrade", tojson(change)); + +assert.soon(() => streamStartedOnOldVersion.hasNext()); +change = streamStartedOnOldVersion.next(); +assert.eq(change.operationType, "insert", tojson(change)); +assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change)); + +assert.commandWorked(coll.insert({_id: "after fcv upgrade"})); +const resumedStreamOnNewVersion = coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV}); + +// Test that all open streams continue to produce change events, and that the newly resumed +// stream sees the write that just happened since it comes after the resume token used. +for (let stream of [streamStartedOnOldVersion, + streamStartedOnNewVersionOldFCV, + streamStartedOnNewVersion, + resumedStreamOnNewVersion]) { + assert.soon(() => stream.hasNext()); + change = stream.next(); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change)); + stream.close(); +} - rst.stopSet(); +rst.stopSet(); }()); |