summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_establishment_finds_new_shards.js
blob: 45bc1583e4635d7cf68652a9dded4e2c174ecfe0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Tests that change streams is able to find and return results from new shards which are added
// during cursor establishment.
// @tags: [uses_change_streams]
(function() {
    'use strict';

    // For supportsMajorityReadConcern().
    load("jstests/multiVersion/libs/causal_consistency_helpers.js");

    if (!supportsMajorityReadConcern()) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        return;
    }

    const rsNodeOptions = {
        // Use a higher frequency for periodic noops to speed up the test.
        setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
    };
    const st =
        new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});

    jsTestLog("Starting new shard (but not adding to shard set yet)");
    const newShard = new ReplSetTest({name: "newShard", nodes: 1, nodeOptions: rsNodeOptions});
    newShard.startSet({shardsvr: ''});
    newShard.initiate();

    const mongos = st.s;
    const mongosColl = mongos.getCollection('test.foo');
    const mongosDB = mongos.getDB("test");

    // Enable sharding to inform mongos of the database, allowing us to open a cursor.
    assert.commandWorked(mongos.adminCommand({enableSharding: mongosDB.getName()}));

    // Shard the collection.
    assert.commandWorked(
        mongos.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));

    // Split the collection into two chunks: [MinKey, 10) and [10, MaxKey].
    assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 10}}));

    // Enable the failpoint.
    assert.commandWorked(mongos.adminCommand({
        configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors",
        mode: "alwaysOn"
    }));

    // While opening the cursor, wait for the failpoint and add the new shard.
    const awaitNewShard = startParallelShell(`
        load("jstests/libs/check_log.js");
        checkLog.contains(db,
            "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled");
        assert.commandWorked(
            db.adminCommand({addShard: "${newShard.getURL()}", name: "${newShard.name}"}));
        // Migrate the [10, MaxKey] chunk to "newShard".
        assert.commandWorked(db.adminCommand({moveChunk: "${mongosColl.getFullName()}",
                                              find: {_id: 20},
                                              to: "${newShard.name}",
                                              _waitForDelete: true}));
        assert.commandWorked(
            db.adminCommand(
                {configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors",
                 mode: "off"}));`,
                                             mongos.port);

    jsTestLog("Opening $changeStream cursor");
    const changeStream = mongosColl.aggregate([{$changeStream: {}}]);
    assert(!changeStream.hasNext(), "Do not expect any results yet");

    // Clean up the parallel shell.
    awaitNewShard();

    // Insert two documents in different shards.
    assert.writeOK(mongosColl.insert({_id: 0}, {writeConcern: {w: "majority"}}));
    assert.writeOK(mongosColl.insert({_id: 20}, {writeConcern: {w: "majority"}}));

    // Expect to see them both.
    for (let id of[0, 20]) {
        jsTestLog("Expecting Item " + id);
        assert.soon(() => changeStream.hasNext());
        let next = changeStream.next();
        assert.eq(next.operationType, "insert");
        assert.eq(next.documentKey, {_id: id});
    }
    assert(!changeStream.hasNext());

    st.stop();
    newShard.stopSet();
})();