summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/change_streams_feature_compatibility_version.js
blob: 23c489893e8b415b5fa8f2230a14c3030865f3ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Test that a change stream is able to survive an upgrade. This is the most basic test to
// demonstrate the survival of a stream, presuming the driver will attempt to retry and resume the
// stream after network errors.
// @tags: [uses_change_streams]
(function() {
    "use strict";

    load("jstests/multiVersion/libs/multi_rs.js");  // For upgradeSet.
    load("jstests/replsets/rslib.js");              // For startSetIfSupportsReadMajority.

    const rst = new ReplSetTest({
        nodes: 2,
        nodeOptions: {binVersion: "last-stable"},
    });

    if (!startSetIfSupportsReadMajority(rst)) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        rst.stopSet();
        return;
    }

    rst.initiate();

    let testDB = rst.getPrimary().getDB(jsTestName());
    let coll = testDB.change_stream_upgrade;

    // Open a change stream against a 4.0 binary. We will use the resume token from this stream to
    // resume the stream once the set has been upgraded.
    let streamStartedOnOldVersion = coll.watch();
    assert.commandWorked(coll.insert({_id: "first insert, just for resume token"}));

    assert.soon(() => streamStartedOnOldVersion.hasNext());
    let change = streamStartedOnOldVersion.next();
    assert.eq(change.operationType, "insert", tojson(change));
    assert.eq(change.documentKey._id, "first insert, just for resume token", tojson(change));
    const resumeTokenFromLastStable = change._id;

    assert.commandWorked(coll.insert({_id: "before binary upgrade"}));
    // Upgrade the set to the new binary version, but keep the feature compatibility version at 4.0.
    rst.upgradeSet({binVersion: "latest"});
    testDB = rst.getPrimary().getDB(jsTestName());
    coll = testDB.change_stream_upgrade;

    // Test that we can resume the stream on the new binaries.
    streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
    assert.soon(() => streamStartedOnOldVersion.hasNext());
    change = streamStartedOnOldVersion.next();
    assert.eq(change.operationType, "insert", tojson(change));
    assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));

    let streamStartedOnNewVersionOldFCV = coll.watch();

    assert.commandWorked(coll.insert({_id: "after binary upgrade, before fcv switch"}));

    let resumeTokenFromNewVersionOldFCV;
    [streamStartedOnOldVersion, streamStartedOnNewVersionOldFCV].forEach(stream => {
        assert.soon(() => stream.hasNext());
        change = stream.next();
        assert.eq(change.operationType, "insert", tojson(change));
        assert.eq(
            change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));
        if (resumeTokenFromNewVersionOldFCV === undefined) {
            resumeTokenFromNewVersionOldFCV = change._id;
        } else {
            assert.eq(resumeTokenFromNewVersionOldFCV, change._id);
        }
    });

    // Explicitly set feature compatibility version to 4.2.
    assert.commandWorked(testDB.adminCommand({setFeatureCompatibilityVersion: "4.2"}));

    const streamStartedOnNewVersion = coll.watch();

    // Test that we can still resume with the token from the old version. We should see the same
    // document again.
    streamStartedOnOldVersion = coll.watch([], {resumeAfter: resumeTokenFromLastStable});
    assert.soon(() => streamStartedOnOldVersion.hasNext());
    change = streamStartedOnOldVersion.next();
    assert.eq(change.operationType, "insert", tojson(change));
    assert.eq(change.documentKey._id, "before binary upgrade", tojson(change));

    assert.soon(() => streamStartedOnOldVersion.hasNext());
    change = streamStartedOnOldVersion.next();
    assert.eq(change.operationType, "insert", tojson(change));
    assert.eq(change.documentKey._id, "after binary upgrade, before fcv switch", tojson(change));

    assert.commandWorked(coll.insert({_id: "after fcv upgrade"}));
    const resumedStreamOnNewVersion =
        coll.watch([], {resumeAfter: resumeTokenFromNewVersionOldFCV});

    // Test that all open streams continue to produce change events, and that the newly resumed
    // stream sees the write that just happened since it comes after the resume token used.
    for (let stream of[streamStartedOnOldVersion,
                       streamStartedOnNewVersionOldFCV,
                       streamStartedOnNewVersion,
                       resumedStreamOnNewVersion]) {
        assert.soon(() => stream.hasNext());
        change = stream.next();
        assert.eq(change.operationType, "insert", tojson(change));
        assert.eq(change.documentKey._id, "after fcv upgrade", tojson(change));
        stream.close();
    }

    rst.stopSet();
}());