summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_primary_shard_unaware.js
blob: 1fdb86564ae2b090d971cef6f414bddee0421e40 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Tests the behavior of a change stream on a collection that becomes sharded, however the primary
// shard is unaware and still sees the collection as unsharded.
//
// This test triggers a compiler bug that causes a crash when compiling with optimizations on, see
// SERVER-36321.
// @tags: [requires_persistence, blacklist_from_rhel_67_s390x, uses_change_streams]
(function() {
    "use strict";

    load('jstests/libs/change_stream_util.js');  // For ChangeStreamTest.

    // For supportsMajorityReadConcern().
    load("jstests/multiVersion/libs/causal_consistency_helpers.js");

    // TODO (SERVER-38673): Remove this once BACKPORT-3428, BACKPORT-3429 are completed.
    if (!jsTestOptions().enableMajorityReadConcern &&
        jsTestOptions().mongosBinVersion === 'last-stable') {
        jsTestLog(
            "Skipping test since 'last-stable' mongos doesn't support speculative majority update lookup queries.");
        return;
    }

    if (!supportsMajorityReadConcern()) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        return;
    }

    // Returns true if the shard is aware that the collection is sharded.
    function isShardAware(shard, coll) {
        const res = shard.adminCommand({getShardVersion: coll, fullMetadata: true});
        assert.commandWorked(res);
        return res.metadata.collVersion != undefined;
    }

    const testName = "change_streams_primary_shard_unaware";
    const st = new ShardingTest({
        shards: 2,
        mongos: 3,
        rs: {
            nodes: 1,
            // Use a higher frequency for periodic noops to speed up the test.
            setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true},
        },
    });

    const mongosDB = st.s0.getDB(testName);

    // Ensure that shard0 is the primary shard.
    assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
    st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

    // Create unsharded collection on primary shard.
    const mongosColl = mongosDB[testName];
    assert.commandWorked(mongosDB.createCollection(testName));

    // Before sharding the collection, issue a write through mongos2 to ensure that it knows the
    // collection exists and believes it is unsharded. This is needed later in the test to avoid
    // triggering a refresh when a change stream is established through mongos2.
    const mongos2DB = st.s2.getDB(testName);
    const mongos2Coll = mongos2DB[testName];
    assert.writeOK(mongos2Coll.insert({_id: 0, a: 0}));

    // Create index on the shard key.
    assert.commandWorked(mongos2Coll.createIndex({a: 1}));

    // Shard the collection.
    assert.commandWorked(
        mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {a: 1}}));

    // Restart the primary shard and ensure that it is no longer aware that the collection is
    // sharded.
    st.restartShardRS(0);
    assert.eq(false, isShardAware(st.rs0.getPrimary(), mongosColl.getFullName()));

    const mongos1DB = st.s1.getDB(testName);
    const mongos1Coll = mongos1DB[testName];

    // Establish change stream cursor on the second mongos, which is not aware that the
    // collection is sharded.
    let cstMongos1 = new ChangeStreamTest(mongos1DB);
    let cursorMongos1 = cstMongos1.startWatchingChanges(
        {pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], collection: mongos1Coll});
    assert.eq(0, cursorMongos1.firstBatch.length, "Cursor had changes: " + tojson(cursorMongos1));

    // Establish a change stream cursor on the now sharded collection through the first mongos.
    let cst = new ChangeStreamTest(mongosDB);
    let cursor = cst.startWatchingChanges(
        {pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], collection: mongosColl});
    assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));

    // Ensure that the primary shard is still unaware that the collection is sharded.
    assert.eq(false, isShardAware(st.rs0.getPrimary(), mongosColl.getFullName()));

    // Insert a doc and verify that the primary shard is now aware that the collection is sharded.
    assert.writeOK(mongosColl.insert({_id: 1, a: 1}));
    assert.eq(true, isShardAware(st.rs0.getPrimary(), mongosColl.getFullName()));

    // Verify that both cursors are able to pick up an inserted document.
    cst.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: [{
            documentKey: {_id: 1, a: 1},
            fullDocument: {_id: 1, a: 1},
            ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
            operationType: "insert",
        }]
    });
    let mongos1ChangeDoc = cstMongos1.getOneChange(cursorMongos1);
    assert.docEq({_id: 1, a: 1}, mongos1ChangeDoc.documentKey);
    assert.docEq({_id: 1, a: 1}, mongos1ChangeDoc.fullDocument);
    assert.eq({db: mongos1DB.getName(), coll: mongos1Coll.getName()}, mongos1ChangeDoc.ns);
    assert.eq("insert", mongos1ChangeDoc.operationType);

    // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
    assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {a: 0}}));

    // Move a chunk to the non-primary shard.
    assert.commandWorked(mongosDB.adminCommand({
        moveChunk: mongosColl.getFullName(),
        find: {a: -1},
        to: st.rs1.getURL(),
        _waitForDelete: true
    }));

    // Update the document on the primary shard.
    assert.writeOK(mongosColl.update({_id: 1, a: 1}, {$set: {b: 1}}));
    // Insert another document to each shard.
    assert.writeOK(mongosColl.insert({_id: -2, a: -2}));
    assert.writeOK(mongosColl.insert({_id: 2, a: 2}));

    // Verify that both cursors pick up the first inserted doc regardless of the moveChunk
    // operation.
    cst.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: [{
            documentKey: {_id: 1, a: 1},
            fullDocument: {_id: 1, a: 1, b: 1},
            ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
            operationType: "update",
            updateDescription: {removedFields: [], updatedFields: {b: 1}}
        }]
    });
    mongos1ChangeDoc = cstMongos1.getOneChange(cursorMongos1);
    assert.docEq({_id: 1, a: 1}, mongos1ChangeDoc.documentKey);
    assert.docEq({_id: 1, a: 1, b: 1}, mongos1ChangeDoc.fullDocument);
    assert.eq({db: mongos1DB.getName(), coll: mongos1Coll.getName()}, mongos1ChangeDoc.ns);
    assert.eq("update", mongos1ChangeDoc.operationType);

    // Restart the primary shard and ensure that it is no longer aware that the collection is
    // sharded.
    st.restartShardRS(0);
    assert.eq(false, isShardAware(st.rs0.getPrimary(), mongosColl.getFullName()));

    // Establish change stream cursor on mongos2 using the resume token from the change steam on
    // mongos1. Mongos2 is aware that the collection exists and thinks that it's unsharded, so it
    // won't trigger a routing table refresh. This must be done using a resume token from an update
    // otherwise the shard will generate the documentKey based on the assumption that the shard key
    // is _id which will cause the cursor establishment to fail due to SERVER-32085.
    let cstMongos2 = new ChangeStreamTest(mongos2DB);
    let cursorMongos2 = cstMongos2.startWatchingChanges({
        pipeline: [{$changeStream: {resumeAfter: mongos1ChangeDoc._id}}],
        collection: mongos2Coll
    });

    cstMongos2.assertNextChangesEqual({
        cursor: cursorMongos2,
        expectedChanges: [{
            documentKey: {_id: -2, a: -2},
            fullDocument: {_id: -2, a: -2},
            ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()},
            operationType: "insert",
        }]
    });

    cstMongos2.assertNextChangesEqual({
        cursor: cursorMongos2,
        expectedChanges: [{
            documentKey: {_id: 2, a: 2},
            fullDocument: {_id: 2, a: 2},
            ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()},
            operationType: "insert",
        }]
    });

    st.stop();

})();