summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_resume_shard_key_change.js
blob: 295cc18384c316354592fb45beee94ac4c525c3e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Tests resuming change streams when shard key is changed.
//
// @tags: [
//   requires_majority_read_concern,
//   uses_change_streams,
//   requires_fcv_53,
// ]

// Cannot run the filtering metadata check on tests that run refineCollectionShardKey.
TestData.skipCheckShardFilteringMetadata = true;

(function() {
"use strict";

const dbName = 'testDB';
const collName = 'testColl';

const st = new ShardingTest({
    shards: 2,
    rs: {
        nodes: 1,
        enableMajorityReadConcern: '',
        // Use the noop writer with a higher frequency for periodic noops to speed up the test.
        setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
    }
});

const db = st.s.getDB(dbName);
const coll = db.getCollection(collName);
const changeStream = coll.watch();

const docs = [
    {_id: 0, shardField1: 0, shardField2: 0},
    {_id: 1, shardField1: 1, shardField2: 0},
    {_id: 2, shardField1: 1, shardField2: 1},
    {_id: 3, shardField1: 0, shardField2: 1},
];
const docKeys = [
    {_id: 0},
    {shardField1: 1, _id: 1},
    {shardField1: 1, shardField2: 1, _id: 2},
    {shardField1: 0, shardField2: 1, _id: 3},
];

// Document inserted in unsharded collection.
assert.commandWorked(coll.insert(docs[0]));

// Document inserted in sharded collection.
assert.commandWorked(coll.createIndex({shardField1: 1}));
st.shardColl(collName,
             {shardField1: 1} /* Shard key */,
             {shardField1: 1} /* Split at */,
             {shardField1: 1} /* Move the chunk containing {shardField1: 1} to its own shard */,
             dbName,
             true /* Wait until documents orphaned by the move get deleted */);
assert.commandWorked(coll.insert(docs[1]));

// Document inserted in shard key refined collection.
assert.commandWorked(coll.createIndex({shardField1: 1, shardField2: 1}));
assert.commandWorked(db.adminCommand({
    refineCollectionShardKey: `${dbName}.${collName}`,
    key: {shardField1: 1, shardField2: 1},
}));
assert.commandWorked(coll.insert(docs[2]));

// Insert one more document as a final sentinel, to ensure that there is always at least one visible
// event following the resume points we wish to test.
assert.commandWorked(coll.insert(docs[3]));

const verifyChanges = (changeStream, startingIndex) => {
    const changes = [];
    assert.soon(() => {
        while (changeStream.hasNext()) {
            changes.push(changeStream.next());
        }
        return changes.length === docs.length - startingIndex;
    });
    assert.docEq(docs.slice(startingIndex), changes.map(x => x.fullDocument));
    assert.docEq(docKeys.slice(startingIndex), changes.map(x => x.documentKey));
    return changes;
};

// Verify that we can resume from each change point.
const changes = verifyChanges(changeStream, 0);
changes.forEach((change, i) => {
    verifyChanges(coll.watch([], {resumeAfter: change._id}), i + 1);
});

st.stop();
})();