summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/change_streams_high_water_mark_replset.js
blob: c06f5770e1fb7fa8b09200a9ff86922c4ce619a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
 * Tests that high water mark and postBatchResumeTokens are handled correctly during upgrade from
 * and downgrade to both 3.6 and a pre-backport version of 4.0 on a single replica set.
 */
(function() {
    "use strict";

    load("jstests/libs/collection_drop_recreate.js");                // For assertCreateCollection.
    load("jstests/multiVersion/libs/change_stream_hwm_helpers.js");  // For ChangeStreamHWMHelpers.
    load("jstests/multiVersion/libs/multi_rs.js");                   // For upgradeSet.
    load("jstests/replsets/rslib.js");  // For startSetIfSupportsReadMajority.

    const preBackport40Version = ChangeStreamHWMHelpers.preBackport40Version;
    const latest40Version = ChangeStreamHWMHelpers.latest40Version;
    const latest36Version = ChangeStreamHWMHelpers.latest36Version;

    const rst = new ReplSetTest({
        nodes: 2,
        nodeOptions: {binVersion: latest36Version},
    });
    if (!startSetIfSupportsReadMajority(rst)) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        rst.stopSet();
        return;
    }
    rst.initiate();

    // Obtain references to the test database and create the test collection.
    let testDB = rst.getPrimary().getDB(jsTestName());
    assert.commandWorked(testDB.createCollection("test"));
    let testColl = testDB.test;

    // Up- or downgrades the replset and then refreshes our references to the test collection.
    function refreshReplSet(version) {
        // Upgrade the set and wait for it to become available again.
        rst.upgradeSet({binVersion: version});
        rst.awaitReplication();

        // Having upgraded the replset, reacquire references to the db and collection.
        testDB = rst.getPrimary().getDB(jsTestName());
        testColl = testDB.test;
    }

    /**
     * Tests the behaviour of the replset while upgrading from 'oldVersion' to 'latest40Version'.
     * The 'oldVersionFCVs' parameter specifies an array of FCVs supported by 'oldVersion'; the
     * upgrade/downgrade procedure will be tested with each of these FCVs.
     */
    function runUpgradeDowngradeTests(oldVersion, oldVersionFCVs) {
        for (let minFCV of oldVersionFCVs) {
            // Stores a high water mark generated by the most recent test.
            let hwmToken = null;

            // We start with the replset running on 'oldVersion'. No streams should produce PBRTs.
            jsTestLog(`Testing binary ${oldVersion}`);
            refreshReplSet(oldVersion);
            assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));
            hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                {coll: testColl, expectPBRT: false});
            assert.eq(hwmToken, undefined);

            // Upgrade the replset to 'latest40Version' but leave it in FCV 'minFCV'.
            jsTestLog(`Upgrading to binary ${latest40Version} with FCV ${minFCV}`);
            refreshReplSet(latest40Version);

            // All streams should now return PBRTs, including high water marks.
            jsTestLog(`Testing binary ${latest40Version} with FCV ${minFCV}`);
            hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true});
            assert.neq(hwmToken, undefined);

            // Set the replset's FCV to 4.0.
            assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));

            // All streams should return PBRTs. We can resume with the HWM from the previous test.
            jsTestLog(`Testing binary ${latest40Version} with FCV 4.0`);
            hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true});
            assert.neq(hwmToken, undefined);

            // Downgrade the replset to FCV 'minFCV'.
            jsTestLog(`Downgrading to FCV ${minFCV}`);
            assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));

            // All streams should return PBRTs and we can still resume from the last HWM token.
            jsTestLog(`Testing binary ${latest40Version} with FCV ${minFCV}`);
            hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                {coll: testColl, expectPBRT: true, hwmToResume: hwmToken, expectResume: true});
            assert.neq(hwmToken, undefined);

            // Downgrade the replset to 'oldVersion'.
            jsTestLog(`Downgrading to binary ${oldVersion}`);
            refreshReplSet(oldVersion);

            // We no longer receive PBRTs, and we cannot resume from the HWM token we generated.
            jsTestLog(`Testing downgraded binary ${oldVersion}`);
            ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                {coll: testColl, expectPBRT: false, hwmToResume: hwmToken, expectResume: false});
        }
    }

    // Run the upgrade/downgrade tests from both 3.6 and pre-backport 4.0 versions.
    runUpgradeDowngradeTests(latest36Version, ["3.6"]);
    runUpgradeDowngradeTests(preBackport40Version, ["3.6", "4.0"]);

    rst.stopSet();
})();