summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_shards_start_in_sync.js
blob: 32cdab274963397d1740fcb72f6caa13870c765f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// This test ensures that change streams on sharded collections start in sync with each other.
//
// As detailed in SERVER-31685, since shard cursors are not established simultaneously, it is
// possible that a sharded change stream could be established on shard 0, then write 'A' to shard 0
// could occur, followed by write 'B' to shard 1, and then the change stream could be established on
// shard 1, then some third write 'C' could occur.  This test ensures that in that case, both 'A'
// and 'B' will be seen in the changestream before 'C'.
// @tags: [uses_change_streams, requires_majority_read_concern]
(function() {
"use strict";

const st = new ShardingTest({
    shards: 2,
    mongos: 2,
    useBridge: true,
    rs: {
        nodes: 1,
        // Use a higher frequency for periodic noops to speed up the test.
        setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
    }
});

const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];

// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

// Shard the test collection on _id.
assert.commandWorked(
    mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));

// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));

// Move the [0, MaxKey) chunk to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
    {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));

function checkStream() {
    load('jstests/libs/change_stream_util.js');  // For assertChangeStreamEventEq.

    db = db.getSiblingDB(jsTestName());
    let coll = db[jsTestName()];
    let changeStream = coll.aggregate([{$changeStream: {}}]);

    assert.soon(() => changeStream.hasNext());
    assertChangeStreamEventEq(changeStream.next(), {
        documentKey: {_id: -1000},
        fullDocument: {_id: -1000},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
    });

    assert.soon(() => changeStream.hasNext());
    assertChangeStreamEventEq(changeStream.next(), {
        documentKey: {_id: 1001},
        fullDocument: {_id: 1001},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
    });

    assert.soon(() => changeStream.hasNext());
    assertChangeStreamEventEq(changeStream.next(), {
        documentKey: {_id: -1002},
        fullDocument: {_id: -1002},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
    });
    changeStream.close();
}

// Start the $changeStream with shard 1 unavailable on the second mongos (s1).  We will be
// writing through the first mongos (s0), which will remain connected to all shards.
st.rs1.getPrimary().disconnect(st.s1);
let waitForShell = startParallelShell(checkStream, st.s1.port);

// Wait for the aggregate cursor to appear in currentOp on the current shard.
function waitForShardCursor(rs) {
    assert.soon(() => rs.getPrimary()
                          .getDB('admin')
                          .aggregate([
                              {"$currentOp": {"idleCursors": true}},
                              {"$match": {ns: mongosColl.getFullName(), type: "idleCursor"}}

                          ])
                          .itcount() === 1);
}
// Make sure the shard 0 $changeStream cursor is established before doing the first writes.
waitForShardCursor(st.rs0);

assert.commandWorked(mongosColl.insert({_id: -1000}, {writeConcern: {w: "majority"}}));

// This write to shard 1 occurs before the $changeStream cursor on shard 1 is open, because the
// mongos where the $changeStream is running is disconnected from shard 1.
assert.commandWorked(mongosColl.insert({_id: 1001}, {writeConcern: {w: "majority"}}));

jsTestLog("Reconnecting");
st.rs1.getPrimary().reconnect(st.s1);
waitForShardCursor(st.rs1);

assert.commandWorked(mongosColl.insert({_id: -1002}, {writeConcern: {w: "majority"}}));
waitForShell();
st.stop();
})();