summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_resume_at_same_clustertime.js
blob: 73cb523ce49785fdcdaffe5747d2b742a794cf9d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
 * Tests that a change stream can be resumed from the higher of two tokens on separate shards whose
 * clusterTime is identical, differing only by documentKey, without causing the PBRT sent to mongoS
 * to go back-in-time.
 * @tags: [requires_replication, requires_journaling, requires_majority_read_concern]
 */
(function() {
    "use strict";

    const st =
        new ShardingTest({shards: 2, rs: {nodes: 1, setParameter: {writePeriodicNoops: false}}});

    const mongosDB = st.s.startSession({causalConsistency: true}).getDatabase(jsTestName());
    const mongosColl = mongosDB.test;

    // Enable sharding on the test DB and ensure its primary is shard0.
    assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
    st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

    // Shard on {_id:1}, split at {_id:0}, and move the upper chunk to shard1.
    st.shardColl(mongosColl, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName(), true);

    // Write one document to each shard.
    assert.commandWorked(mongosColl.insert({_id: -10}));
    assert.commandWorked(mongosColl.insert({_id: 10}));

    // Open a change stream cursor to listen for subsequent events.
    let csCursor = mongosColl.watch([], {cursor: {batchSize: 1}});

    // Update both documents in the collection, such that the events are likely to have the same
    // clusterTime. We update twice to ensure that the PBRT for both shards moves past the first two
    // updates.
    assert.commandWorked(mongosColl.update({}, {$set: {updated: 1}}, {multi: true}));
    assert.commandWorked(mongosColl.update({}, {$set: {updatedAgain: 1}}, {multi: true}));

    // Retrieve the first two events and confirm that they are in order with non-descending
    // clusterTime. Unfortunately we cannot guarantee that clusterTime will be identical, since it
    // is based on each shard's local value and there are operations beyond noop write that can
    // bump the oplog timestamp. We expect however that they will be identical for most test runs,
    // so there is value in testing.
    let clusterTime = null, updateEvent = null;
    for (let x = 0; x < 2; ++x) {
        assert.soon(() => csCursor.hasNext());
        updateEvent = csCursor.next();
        clusterTime = (clusterTime || updateEvent.clusterTime);
        assert.gte(updateEvent.clusterTime, clusterTime);
        assert.eq(updateEvent.updateDescription.updatedFields.updated, 1);
    }
    assert.soon(() => csCursor.hasNext());

    // Update both documents again, so that we will have something to observe after resuming.
    assert.commandWorked(mongosColl.update({}, {$set: {updatedYetAgain: 1}}, {multi: true}));

    // Resume from the second update, and confirm that we only see events starting with the third
    // and fourth updates. We use batchSize:1 to induce mongoD to send each individual event to the
    // mongoS when resuming, rather than scanning all the way to the most recent point in its oplog.
    csCursor = mongosColl.watch([], {resumeAfter: updateEvent._id, cursor: {batchSize: 1}});
    clusterTime = updateEvent = null;
    for (let x = 0; x < 2; ++x) {
        assert.soon(() => csCursor.hasNext());
        updateEvent = csCursor.next();
        clusterTime = (clusterTime || updateEvent.clusterTime);
        assert.gte(updateEvent.clusterTime, clusterTime);
        assert.eq(updateEvent.updateDescription.updatedFields.updatedAgain, 1);
    }
    assert.soon(() => csCursor.hasNext());

    st.stop();
})();