summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_streams_establishment_finds_new_shards.js
blob: e79f6030e6117a58cc77b9eb07321471d7c1c6b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Tests that change streams is able to find and return results from new shards which are added
// during cursor establishment.
// @tags: [
//   requires_majority_read_concern,
//   uses_change_streams,
// ]
(function() {
'use strict';

const rsNodeOptions = {
    // Use a higher frequency for periodic noops to speed up the test.
    setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
};
const st =
    new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});

jsTestLog("Starting new shard (but not adding to shard set yet)");
const newShard = new ReplSetTest({name: "newShard", nodes: 1, nodeOptions: rsNodeOptions});
newShard.startSet({shardsvr: ''});
newShard.initiate();

const mongos = st.s;
const mongosColl = mongos.getCollection('test.foo');
const mongosDB = mongos.getDB("test");

// Enable sharding to inform mongos of the database, allowing us to open a cursor.
assert.commandWorked(mongos.adminCommand({enableSharding: mongosDB.getName()}));

// Shard the collection.
assert.commandWorked(
    mongos.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));

// Split the collection into two chunks: [MinKey, 10) and [10, MaxKey].
assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middle: {_id: 10}}));

// Enable the failpoint.
assert.commandWorked(mongos.adminCommand(
    {configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"}));

// While opening the cursor, wait for the failpoint and add the new shard.
const awaitNewShard = startParallelShell(`
        checkLog.contains(db,
            "shardedAggregateHangBeforeEstablishingShardCursors fail point enabled");
        assert.commandWorked(
            db.adminCommand({addShard: "${newShard.getURL()}", name: "${newShard.name}"}));
        // Migrate the [10, MaxKey] chunk to "newShard".
        assert.commandWorked(db.adminCommand({moveChunk: "${mongosColl.getFullName()}",
                                              find: {_id: 20},
                                              to: "${newShard.name}",
                                              _waitForDelete: true}));
        assert.commandWorked(
            db.adminCommand(
                {configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors",
                 mode: "off"}));`,
                                             mongos.port);

jsTestLog("Opening $changeStream cursor");
const changeStream = mongosColl.aggregate([{$changeStream: {}}]);
assert(!changeStream.hasNext(), "Do not expect any results yet");

// Clean up the parallel shell.
awaitNewShard();

// Insert two documents in different shards.
assert.commandWorked(mongosColl.insert({_id: 0}, {writeConcern: {w: "majority"}}));
assert.commandWorked(mongosColl.insert({_id: 20}, {writeConcern: {w: "majority"}}));

// Expect to see them both.
for (let id of [0, 20]) {
    jsTestLog("Expecting Item " + id);
    assert.soon(() => changeStream.hasNext());
    let next = changeStream.next();
    assert.eq(next.operationType, "insert");
    assert.eq(next.documentKey, {_id: id});
}
assert(!changeStream.hasNext());

st.stop();
newShard.stopSet();
})();