summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_unsharded_becomes_sharded.js
blob: 9998ef9bcdaad9e32681e9f96998effcbe631f55 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Tests the behavior of change streams on a collection that was initially unsharded but then
// becomes sharded. In particular, test that post-shardCollection inserts update their cached
// 'documentKey' to include the new shard key, and that a resume token obtained prior to the
// shardCollection command can be used to resume the stream even after the collection has been
// sharded.
// @tags: [
//   requires_majority_read_concern,
//   uses_change_streams,
//   # TODO SERVER-30784: Remove 'multiversion_incompatible' tag and
//   # 'throwChangeStreamTopologyChangeExceptionToClient'.
//   multiversion_incompatible
// ]
(function() {
"use strict";

load('jstests/libs/change_stream_util.js');  // For ChangeStreamTest.

const testName = "change_streams_unsharded_becomes_sharded";
const st = new ShardingTest({
    shards: 2,
    mongos: 1,
    rs: {
        nodes: 1,
        enableMajorityReadConcern: '',
        // Use a higher frequency for periodic noops to speed up the test.
        setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
    }
});

const mongosDB = st.s0.getDB("test");
const mongosColl = mongosDB[testName];

function testUnshardedBecomesSharded(collToWatch) {
    mongosColl.drop();
    mongosDB.createCollection(testName);
    mongosColl.createIndex({x: 1});

    st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

    // Establish a change stream cursor on the unsharded collection.
    const cst = new ChangeStreamTest(mongosDB);

    // Create a different collection in the same database, and verify that it doesn't affect the
    // results of the change stream.
    const mongosCollOther = mongosDB[testName + "other"];
    mongosCollOther.drop();
    mongosDB.createCollection(testName + "other");
    mongosCollOther.createIndex({y: 1});

    let cursor = cst.startWatchingChanges({
        pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}],
        collection: collToWatch
    });
    assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));

    // Verify that the cursor picks up documents inserted while the collection is unsharded. The
    // 'documentKey' at this point is simply the _id field.
    assert.commandWorked(mongosColl.insert({_id: 0, x: 0}));
    assert.commandWorked(mongosCollOther.insert({_id: 0, y: 0}));
    const [preShardCollectionChange] = cst.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: [{
            documentKey: {_id: 0},
            fullDocument: {_id: 0, x: 0},
            ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
            operationType: "insert",
        }]
    });

    // Record the resume token for this change, before the collection is sharded.
    const preShardCollectionResumeToken = preShardCollectionChange._id;

    // Shard the test collection with shard key {x: 1} and split into 2 chunks.
    st.shardColl(mongosColl.getName(), {x: 1}, {x: 0}, false, mongosDB.getName());

    // Shard the other collection with shard key {y: 1} and split into 2 chunks.
    st.shardColl(mongosCollOther.getName(), {y: 1}, {y: 0}, false, mongosDB.getName());

    // List the changes we expect to see for the next two operations on the sharded collection.
    // Later, we will resume the stream using the token generated before the collection was
    // sharded, and will need to confirm that we can still see these two changes.
    const postShardCollectionChanges = [
        {
            documentKey: {x: 1, _id: 1},
            fullDocument: {_id: 1, x: 1},
            ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
            operationType: "insert",
        },
        {
            documentKey: {x: -1, _id: -1},
            fullDocument: {_id: -1, x: -1},
            ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
            operationType: "insert",
        }
    ];

    // Verify that the cursor on the original shard is still valid and sees new inserted
    // documents. The 'documentKey' field should now include the shard key.
    assert.commandWorked(mongosColl.insert({_id: 1, x: 1}));
    assert.commandWorked(mongosCollOther.insert({_id: 1, y: 1}));
    cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[0]]});

    // Move the [minKey, 0) chunk to shard1.
    assert.commandWorked(mongosDB.adminCommand({
        moveChunk: mongosColl.getFullName(),
        find: {x: -1},
        to: st.rs1.getURL(),
        _waitForDelete: true
    }));
    assert.commandWorked(mongosDB.adminCommand({
        moveChunk: mongosCollOther.getFullName(),
        find: {y: -1},
        to: st.rs1.getURL(),
        _waitForDelete: true
    }));

    // Make sure the change stream cursor sees a document inserted on the recipient shard.
    assert.commandWorked(mongosColl.insert({_id: -1, x: -1}));
    assert.commandWorked(mongosCollOther.insert({_id: -1, y: -1}));
    cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [postShardCollectionChanges[1]]});

    // Confirm that we can resume the stream on the sharded collection using the token generated
    // while the collection was unsharded, whose documentKey contains the _id field but not the
    // shard key.
    let resumedCursor = cst.startWatchingChanges({
        pipeline: [{$changeStream: {resumeAfter: preShardCollectionResumeToken}}],
        collection: mongosColl
    });

    // Verify that we see both of the insertions which occurred after the collection was
    // sharded.
    cst.assertNextChangesEqual(
        {cursor: resumedCursor, expectedChanges: postShardCollectionChanges});

    // Test the behavior of a change stream when a sharded collection is dropped and recreated.
    cursor = cst.startWatchingChanges({
        pipeline: [{$changeStream: {}}, {$match: {"ns.coll": mongosColl.getName()}}],
        collection: collToWatch
    });
    assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));

    // Insert a couple documents to shard1, creating a scenario where the getMore to shard0 will
    // indicate that the change stream is invalidated yet shard1 will still have data to return.
    assert.commandWorked(mongosColl.insert({_id: -2, x: -2}));
    assert.commandWorked(mongosColl.insert({_id: -3, x: -3}));

    // Drop and recreate the collection.
    mongosColl.drop();
    mongosDB.createCollection(mongosColl.getName());
    mongosColl.createIndex({z: 1});

    // Shard the collection on a different shard key and ensure that each shard has a chunk.
    st.shardColl(mongosColl.getName(), {z: 1}, {z: 0}, {z: -1}, mongosDB.getName());

    assert.commandWorked(mongosColl.insert({_id: -1, z: -1}));
    assert.commandWorked(mongosColl.insert({_id: 1, z: 1}));

    // Verify that the change stream picks up the inserts. The shard keys are present since they are
    // recorded in the oplog.
    cst.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: [
            {
                documentKey: {x: -2, _id: -2},
                fullDocument: {_id: -2, x: -2},
                ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
                operationType: "insert",
            },
            {
                documentKey: {x: -3, _id: -3},
                fullDocument: {_id: -3, x: -3},
                ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
                operationType: "insert",
            }
        ]
    });

    // Verify that the kNewShardDetected event is successfully delivered to mongoS even in cases
    // where the event does not match the user's filter.
    // TODO SERVER-30784: remove this test-case, or rework it without the failpoint, when the
    // kNewShardDetected event is the only way we detect a new shard for the collection.
    mongosDB.adminCommand(
        {configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "alwaysOn"});
    ChangeStreamTest.assertChangeStreamThrowsCode({
        db: mongosDB,
        collName: collToWatch,
        pipeline: [
            {$changeStream: {resumeAfter: preShardCollectionResumeToken}},
            {$match: {operationType: "delete"}}
        ],
        expectedCode: ErrorCodes.ChangeStreamTopologyChange
    });
    mongosDB.adminCommand(
        {configureFailPoint: "throwChangeStreamTopologyChangeExceptionToClient", mode: "off"});

    cst.cleanUp();
}

// First test against a change stream on a single collection.
testUnshardedBecomesSharded(mongosColl.getName());

// Test against a change stream on the entire database.
testUnshardedBecomesSharded(1);

st.stop();
})();