summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_shards_start_in_sync.js
blob: 4cf64d5f54cebbdb1a916ab980f92c2369dd7c45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// This test ensures that change streams on sharded collections start in sync with each other.
//
// As detailed in SERVER-31685, since shard cursors are not established simultaneously, it is
// possible that a sharded change stream could be established on shard 0, then write 'A' to shard 0
// could occur, followed by write 'B' to shard 1, and then the change stream could be established on
// shard 1, then some third write 'C' could occur.  This test ensures that in that case, both 'A'
// and 'B' will be seen in the changestream before 'C'.
// @tags: [
//   does_not_support_stepdowns,
//   requires_majority_read_concern,
//   uses_change_streams,
// ]
(function() {
"use strict";

// Check the build flags to determine whether we are running in a code-coverage variant. These
// variants are sufficiently slow as to interfere with the operation of this test, so we skip them.
if (buildInfo().buildEnvironment.ccflags.includes('-ftest-coverage')) {
    jsTestLog("Skipping the test case run with code-coverage enabled");
    return;
}

const st = new ShardingTest({
    shards: 2,
    mongos: 2,
    useBridge: true,
    rs: {
        nodes: 1,
        // Use a higher frequency for periodic noops to speed up the test.
        setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
    }
});

const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];

// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

// Shard the test collection on _id.
assert.commandWorked(
    mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));

// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));

// Move the [0, MaxKey) chunk to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
    {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));

function checkStream() {
    load('jstests/libs/change_stream_util.js');  // For assertChangeStreamEventEq.

    db = db.getSiblingDB(jsTestName());
    let coll = db[jsTestName()];
    let changeStream = coll.aggregate([{$changeStream: {}}], {comment: jsTestName()});

    assert.soon(() => changeStream.hasNext());
    assertChangeStreamEventEq(changeStream.next(), {
        documentKey: {_id: -1000},
        fullDocument: {_id: -1000},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
    });

    assert.soon(() => changeStream.hasNext());
    assertChangeStreamEventEq(changeStream.next(), {
        documentKey: {_id: 1001},
        fullDocument: {_id: 1001},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
    });

    assert.soon(() => changeStream.hasNext());
    assertChangeStreamEventEq(changeStream.next(), {
        documentKey: {_id: -1002},
        fullDocument: {_id: -1002},
        ns: {db: db.getName(), coll: coll.getName()},
        operationType: "insert",
    });
    changeStream.close();
}

// Helper function to list all idle cursors on the test namespace, with an optional extra filter.
function listIdleCursorsOnTestNs(rs, filter = {}) {
    return rs.getPrimary()
        .getDB('admin')
        .aggregate([
            {$currentOp: {idleCursors: true}},
            {
                $match: {
                    ns: mongosColl.getFullName(),
                    type: "idleCursor",
                }
            },
            {$match: filter}
        ])
        .toArray();
}

// Start the $changeStream with shard 1 unavailable on the second mongos (s1).  We will be
// writing through the first mongos (s0), which will remain connected to all shards.
st.rs1.getPrimary().disconnect(st.s1);
let waitForShell = startParallelShell(checkStream, st.s1.port);

// Helper function which waits for a $changeStream cursor to appear in currentOp on the given shard.
function waitForShardCursor(rs) {
    assert.soon(() => listIdleCursorsOnTestNs(rs, {
                          "cursor.originatingCommand.aggregate": {$exists: true},
                          "cursor.originatingCommand.comment": jsTestName()
                      }).length === 1,
                () => tojson(listIdleCursorsOnTestNs(rs)));
}

// Make sure the shard 0 $changeStream cursor is established before doing the first writes.
waitForShardCursor(st.rs0);

assert.commandWorked(mongosColl.insert({_id: -1000}, {writeConcern: {w: "majority"}}));

// This write to shard 1 occurs before the $changeStream cursor on shard 1 is open, because the
// mongos where the $changeStream is running is disconnected from shard 1.
assert.commandWorked(mongosColl.insert({_id: 1001}, {writeConcern: {w: "majority"}}));

jsTestLog("Reconnecting");
st.rs1.getPrimary().reconnect(st.s1);
waitForShardCursor(st.rs1);

assert.commandWorked(mongosColl.insert({_id: -1002}, {writeConcern: {w: "majority"}}));
waitForShell();
st.stop();
})();