summaryrefslogtreecommitdiff
path: root/jstests/sharding/resume_change_stream_from_stale_mongos.js
blob: fbc8bd904bb19b73b6e3b2f9e21a03ceac610e42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Tests that resuming a change stream that has become sharded via a mongos that believes the
// collection is still unsharded will end up targeting the change stream to all shards after getting
// a stale shard version.
// @tags: [uses_change_streams]
(function() {
"use strict";

// For supportsMajorityReadConcern().
load("jstests/multiVersion/libs/causal_consistency_helpers.js");

if (!supportsMajorityReadConcern()) {
    jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
    return;
}

// Create a 2-shard cluster. Enable 'writePeriodicNoops' and set 'periodicNoopIntervalSecs' to 1
// second so that each shard is continually advancing its optime, allowing the
// AsyncResultsMerger to return sorted results even if some shards have not yet produced any
// data.
const st = new ShardingTest({
    shards: 2,
    mongos: 2,
    rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
});

const firstMongosDB = st.s0.getDB(jsTestName());
const firstMongosColl = firstMongosDB.test;

// Enable sharding on the test DB and ensure its primary is shard 0.
assert.commandWorked(firstMongosDB.adminCommand({enableSharding: firstMongosDB.getName()}));
st.ensurePrimaryShard(firstMongosDB.getName(), st.rs0.getURL());

// Establish a change stream while it is unsharded, then shard the collection, move a chunk, and
// record a resume token after the first chunk migration.
let changeStream = firstMongosColl.aggregate([{$changeStream: {}}]);

assert.writeOK(firstMongosColl.insert({_id: -1}));
assert.writeOK(firstMongosColl.insert({_id: 1}));

for (let nextId of [-1, 1]) {
    assert.soon(() => changeStream.hasNext());
    let next = changeStream.next();
    assert.eq(next.operationType, "insert");
    assert.eq(next.fullDocument, {_id: nextId});
}

// Shard the test collection on _id, split the collection into 2 chunks: [MinKey, 0) and
// [0, MaxKey), then move the [0, MaxKey) chunk to shard 1.
assert.commandWorked(
    firstMongosDB.adminCommand({shardCollection: firstMongosColl.getFullName(), key: {_id: 1}}));
assert.commandWorked(
    firstMongosDB.adminCommand({split: firstMongosColl.getFullName(), middle: {_id: 0}}));
assert.commandWorked(firstMongosDB.adminCommand(
    {moveChunk: firstMongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));

// Then do one insert to each shard.
assert.writeOK(firstMongosColl.insert({_id: -2}));
assert.writeOK(firstMongosColl.insert({_id: 2}));

// The change stream should see all the inserts after internally re-establishing cursors after
// the chunk split.
let resumeToken = null;  // We'll fill this out to be the token of the last change.
for (let nextId of [-2, 2]) {
    assert.soon(() => changeStream.hasNext());
    let next = changeStream.next();
    assert.eq(next.operationType, "insert");
    assert.eq(next.fullDocument, {_id: nextId});
    resumeToken = next._id;
}

// Do some writes that occur on each shard after the resume token.
assert.writeOK(firstMongosColl.insert({_id: -3}));
assert.writeOK(firstMongosColl.insert({_id: 3}));

// Now try to resume the change stream using a stale mongos which believes the collection is
// unsharded. The first mongos should use the shard versioning protocol to discover that the
// collection is no longer unsharded, and re-target to all shards in the cluster.
changeStream.close();
const secondMongosColl = st.s1.getDB(jsTestName()).test;
changeStream = secondMongosColl.aggregate([{$changeStream: {resumeAfter: resumeToken}}]);
// Verify we can see both inserts that occurred after the resume point.
for (let nextId of [-3, 3]) {
    assert.soon(() => changeStream.hasNext());
    let next = changeStream.next();
    assert.eq(next.operationType, "insert");
    assert.eq(next.fullDocument, {_id: nextId});
}

st.stop();
}());