summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/genericChangeStreams/change_streams_multi_version_cluster.js
blob: 7a207192fda21da9965113abb47cf0e5b08e1b03 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Verify that we can successfully resume a change stream during several different stages of a
// cluster upgrade.
//
// @tags: [uses_change_streams, requires_replication]

// Checking UUID consistency uses cached connections, which are not valid across restarts or
// stepdowns.
TestData.skipCheckingUUIDsConsistentAcrossCluster = true;

(function() {
"use strict";

load("jstests/multiVersion/libs/multi_rs.js");       // Used by upgradeSet.
load("jstests/multiVersion/libs/multi_cluster.js");  // For upgradeCluster.

const dbName = "test";
const collName = "change_streams_multi_version_sortkey";
const namespace = dbName + "." + collName;

function runTest(downgradeVersion) {
    jsTestLog("Running test with 'downgradeVersion': " + downgradeVersion);
    // Start a sharded cluster in which all mongod and mongos processes are of the downgraded
    // binVersion. We set "writePeriodicNoops" to write to the oplog every 1 second, which ensures
    // that test change streams do not wait for longer than 1 second if one of the shards has no
    // changes to report.
    var st = new ShardingTest({
        shards: 2,
        rs: {
            nodes: 2,
            binVersion: downgradeVersion,
            setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
        },
        other: {mongosOptions: {binVersion: downgradeVersion}}
    });

    let mongosConn = st.s;
    assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).createIndex({shard: 1}));
    st.ensurePrimaryShard(dbName, st.shard0.shardName);

    // Shard the test collection and split it into two chunks: one that contains all {shard: 1}
    // documents and one that contains all {shard: 2} documents.
    st.shardColl(collName,
                 {shard: 1} /* Shard key */,
                 {shard: 2} /* Split at */,
                 {shard: 2} /* Move the chunk containing {shard: 2} to its own shard */,
                 dbName,
                 true /* Wait until documents orphaned by the move get deleted */);

    // Insert new documents on both shards, verify that each insertion outputs a result from the
    // 'changeStream' cursor, verify that the change stream results have monotonically increasing
    // timestamps, and return the resume token.
    var nextId = 0;
    function insertAndValidateChanges(coll, changeStream) {
        const docsToInsert =
            Array.from({length: 10}, (_, i) => ({_id: nextId + i, shard: i % 2, val: i}));
        nextId += docsToInsert.length;

        assert.commandWorked(coll.insert(docsToInsert));

        const changeList = [];
        assert.soon(function() {
            while (changeStream.hasNext()) {
                const change = changeStream.next();
                changeList.push(change);
            }

            return changeList.length === docsToInsert.length;
        }, changeList);

        for (let i = 0; i + 1 < changeList.length; ++i) {
            assert(timestampCmp(changeList[i].clusterTime, changeList[i + 1].clusterTime) <= 0,
                   "Change timestamps are not monotonically increasing: " + tojson(changeList));
        }

        return changeStream.getResumeToken();
    }

    //
    // Open and read a change stream on the downgrade version cluster.
    //
    let coll = mongosConn.getDB(dbName)[collName];
    let resumeToken = insertAndValidateChanges(coll, coll.watch());

    //
    // Upgrade the config db and the shards to the "latest" binVersion.
    //
    st.upgradeCluster(
        "latest",
        {upgradeShards: true, upgradeConfigs: true, upgradeMongos: false, waitUntilStable: true});

    //
    // Open and read a change stream on the upgraded cluster but still using a downgraded version of
    // mongos and downgraded version for the FCV.
    //
    resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));

    //
    // Upgrade mongos to the "latest" binVersion and then open and read a change stream, this time
    // with all cluster nodes upgraded but still in downgraded FCV.
    //
    st.upgradeCluster(
        "latest",
        {upgradeShards: false, upgradeConfigs: false, upgradeMongos: true, waitUntilStable: true});
    mongosConn = st.s;
    coll = mongosConn.getDB(dbName)[collName];

    resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));

    //
    // Set the FCV to the "latest" version, and then open and read a change stream on the completely
    // upgraded cluster.
    //
    assert.commandWorked(mongosConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
    checkFCV(st.configRS.getPrimary().getDB("admin"), latestFCV);
    checkFCV(st.rs0.getPrimary().getDB("admin"), latestFCV);
    checkFCV(st.rs1.getPrimary().getDB("admin"), latestFCV);

    //
    // Open and read a change stream on the upgraded cluster.
    //
    resumeToken = insertAndValidateChanges(coll, coll.watch([], {resumeAfter: resumeToken}));

    st.stop();
}

runTest("last-continuous");
runTest("last-lts");
}());