summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/change_streams_high_water_mark_cluster.js
blob: 9abb4c911f26ee315cfe7537faeba786a5e97e18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
/**
 * Tests that high water mark and postBatchResumeTokens are handled correctly during upgrade from
 * and downgrade to both 3.6 and a pre-backport version of 4.0 on a sharded cluster.
 * @tags: [uses_change_streams]
 */
(function() {
    "use strict";

    load("jstests/libs/collection_drop_recreate.js");                 // assertCreateCollection
    load("jstests/libs/fixture_helpers.js");                          // runCommandOnEachPrimary
    load("jstests/multiVersion/libs/causal_consistency_helpers.js");  // supportsMajorityReadConcern
    load("jstests/multiVersion/libs/change_stream_hwm_helpers.js");   // ChangeStreamHWMHelpers
    load("jstests/multiVersion/libs/multi_cluster.js");               // upgradeCluster
    load("jstests/multiVersion/libs/multi_rs.js");                    // upgradeSet

    if (!supportsMajorityReadConcern()) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        return;
    }

    const preBackport40Version = ChangeStreamHWMHelpers.preBackport40Version;
    const latest40Version = ChangeStreamHWMHelpers.latest40Version;
    const latest36Version = ChangeStreamHWMHelpers.latest36Version;

    const st = new ShardingTest({
        shards: 2,
        mongos: 1,
        rs: {nodes: 3},
        other: {
            mongosOptions: {binVersion: latest36Version},
            configOptions: {binVersion: latest36Version},
            rsOptions: {
                binVersion: latest36Version,
                setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
            },
        }
    });

    // Obtain references to the test database via mongoS and directly on shard0.
    let mongosDB = st.s.getDB(jsTestName());
    let primaryShard = st.rs0.getPrimary();

    // Names of each of the collections used in the course of this test.
    const shardedCollName = "sharded_coll";
    const unshardedCollName = "unsharded_coll";

    // Updates the specified cluster components and then refreshes our references to each of them.
    function refreshCluster(version, components, singleShard) {
        if (singleShard) {
            singleShard.upgradeSet({binVersion: version});
        } else {
            st.upgradeCluster(version, components);
        }

        // Wait for the config server and shards to become available, and restart mongoS.
        st.configRS.awaitReplication();
        st.rs0.awaitReplication();
        st.rs1.awaitReplication();
        st.restartMongoses();

        // Having upgraded the cluster, reacquire references to each component.
        mongosDB = st.s.getDB(jsTestName());
        primaryShard = st.rs0.getPrimary();

        // Re-apply the 'writePeriodicNoops' parameter to the up/downgraded shards.
        const mongosAdminDB = mongosDB.getSiblingDB("admin");
        FixtureHelpers.runCommandOnEachPrimary(
            {db: mongosAdminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}});
    }

    // Enable sharding on the the test database and ensure that the primary is shard0.
    assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
    st.ensurePrimaryShard(mongosDB.getName(), primaryShard.name);

    // Create an unsharded collection on the primary shard via mongoS.
    assertCreateCollection(mongosDB, unshardedCollName);

    // Create a sharded collection on {_id: 1}, split across the shards at {_id: 0}.
    const collToShard = assertCreateCollection(mongosDB, shardedCollName);
    st.shardColl(collToShard, {_id: 1}, {_id: 0}, {_id: 1});

    // Maps used to associate collection names with collection objects and HWM tokens.
    const collMap = {
        [unshardedCollName]: () => st.s.getDB(jsTestName())[shardedCollName],
        [shardedCollName]: () => st.s.getDB(jsTestName())[unshardedCollName]
    };

    /**
     * Tests the behaviour of the cluster while upgrading from 'oldVersion' to 'latest40Version'.
     * The 'oldVersionFCVs' parameter specifies an array of FCVs supported by 'oldVersion'; the
     * upgrade/downgrade procedure will be tested with each of these FCVs.
     */
    function runUpgradeDowngradeTests(oldVersion, oldVersionFCVs) {
        for (let minFCV of oldVersionFCVs) {
            // Create a map to record the HWMs returned by each test type.
            const hwmTokenMap = {};

            // We start with the cluster running on 'oldVersion'. Should not see any PBRTs.
            jsTestLog(`Testing binary ${oldVersion} mongoS and shards, FCV ${minFCV}`);
            refreshCluster(oldVersion);
            assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));
            for (let collName in collMap) {
                const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                    {coll: collMap[collName](), expectPBRT: false});
                assert.eq(hwmToken, undefined);
            }

            // Upgrade a single shard to 'latest40Version' but leave the mongoS on 'oldVersion'. No
            // streams produce PBRTs, but the new shard should continue to produce resumable tokens
            // and the old-style $sortKey while the cluster is mid-upgrade.
            jsTestLog(`Upgrading shard1 to binary ${latest40Version} FCV ${minFCV}`);
            refreshCluster(latest40Version, null, st.rs1);
            for (let collName in collMap) {
                const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                    {coll: collMap[collName](), expectPBRT: false});
                assert.eq(hwmToken, undefined);
            }

            // Upgrade the remaining shard to 'latest40Version', but leave mongoS on 'oldVersion'.
            jsTestLog(
                `Upgrading to ${oldVersion} mongoS and ${latest40Version} shards, FCV ${minFCV}`);
            refreshCluster(latest40Version,
                           {upgradeMongos: false, upgradeShards: true, upgradeConfigs: true});

            // The shards are upgraded to 'latest40Version' but mongoS is running 'oldVersion'. The
            // mongoS should be able to merge the output from the shards, but neither mongoS stream
            // will produce a PBRT.
            jsTestLog(
                `Testing ${oldVersion} mongoS and ${latest40Version} shards with FCV ${minFCV}`);
            for (let collName in collMap) {
                const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens(
                    {coll: collMap[collName](), expectPBRT: false});
                assert.eq(hwmToken, undefined);
            }

            // Upgrade the mongoS to 'latest40Version' but leave the cluster in 'minFCV'.
            jsTestLog(
                `Upgrading to binary ${latest40Version} mongoS and shards with FCV ${minFCV}`);
            refreshCluster(latest40Version,
                           {upgradeMongos: true, upgradeShards: false, upgradeConfigs: false});

            // All streams should now return PBRTs, and we should obtain a valid HWM from the test.
            jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV ${minFCV}`);
            for (let collName in collMap) {
                hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
                    coll: collMap[collName](),
                    expectPBRT: true,
                    hwmToResume: hwmTokenMap[collName],
                    expectResume: true
                });
                assert.neq(hwmTokenMap[collName], undefined);
            }

            // Set the cluster's FCV to 4.0.
            assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: "4.0"}));

            // All streams return PBRTs, and we can resume with the HWMs from the previous test.
            jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV 4.0`);
            for (let collName in collMap) {
                hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
                    coll: collMap[collName](),
                    expectPBRT: true,
                    hwmToResume: hwmTokenMap[collName],
                    expectResume: true
                });
                assert.neq(hwmTokenMap[collName], undefined);
            }

            // Downgrade the cluster to 'minFCV'. We should continue to produce PBRTs and can still
            // resume from the tokens that we generated in the preceding tests.
            jsTestLog(`Downgrading to FCV ${minFCV} shards`);
            assert.commandWorked(mongosDB.adminCommand({setFeatureCompatibilityVersion: minFCV}));

            jsTestLog(`Testing binary ${latest40Version} mongoS and shards with FCV ${minFCV}`);
            for (let collName in collMap) {
                hwmTokenMap[collName] = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
                    coll: collMap[collName](),
                    expectPBRT: true,
                    hwmToResume: hwmTokenMap[collName],
                    expectResume: true
                });
                assert.neq(hwmTokenMap[collName], undefined);
            }

            // Downgrade the mongoS to 'oldVersion'. We should be able to create new streams and
            // resume from their tokens, but cannot resume from the previously-generated v1 tokens.
            jsTestLog(`Downgrading to binary ${oldVersion} mongoS with FCV ${minFCV} shards`);
            refreshCluster(oldVersion,
                           {upgradeMongos: true, upgradeShards: false, upgradeConfigs: false});

            // We should no longer receive any PBRTs via mongoS, and we cannot resume from the HWMs.
            jsTestLog(`Testing binary ${oldVersion} mongoS and binary ${latest40Version} shards`);
            for (let collName in collMap) {
                const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
                    coll: collMap[collName](),
                    expectPBRT: false,
                    hwmToResume: hwmTokenMap[collName],
                    expectResume: false
                });
                assert.eq(hwmToken, undefined);
            }

            // Downgrade a single shard to 'oldVersion'. We should continue to observe the same
            // behaviour as we did in the previous test.
            jsTestLog(`Downgrading shard1 to binary ${oldVersion} FCV ${minFCV}`);
            refreshCluster(oldVersion, null, st.rs1);
            for (let collName in collMap) {
                const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
                    coll: collMap[collName](),
                    expectPBRT: false,
                    hwmToResume: hwmTokenMap[collName],
                    expectResume: false
                });
                assert.eq(hwmToken, undefined);
            }

            // Downgrade the remainder of the cluster to binary 'oldVersion'.
            jsTestLog(`Downgrading to binary ${oldVersion} shards`);
            refreshCluster(oldVersion,
                           {upgradeShards: true, upgradeConfigs: false, upgradeMongos: false});
            refreshCluster(oldVersion,
                           {upgradeConfigs: true, upgradeShards: false, upgradeMongos: false});

            // We should no longer receive any PBRTs, and we cannot resume from the HWM tokens.
            jsTestLog(`Testing downgraded binary ${oldVersion} mongoS and shards`);
            for (let collName in collMap) {
                const hwmToken = ChangeStreamHWMHelpers.testPostBatchAndHighWaterMarkTokens({
                    coll: collMap[collName](),
                    expectPBRT: false,
                    hwmToResume: hwmTokenMap[collName],
                    expectResume: false
                });
                assert.eq(hwmToken, undefined);
            }
        }
    }

    // Run the upgrade/downgrade tests from both 3.6 and pre-backport 4.0 versions.
    runUpgradeDowngradeTests(latest36Version, ["3.6"]);
    runUpgradeDowngradeTests(preBackport40Version, ["3.6", "4.0"]);

    st.stop();
})();