summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
blob: 7fd5aa2abc4d977838f078edd7ab1ed5fbf056da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/**
 * Confirms that resuming from an event which has the same clusterTime but a different UUID than on
 * another shard does not cause the resume attempt to be prematurely rejected. Reproduction script
 * for the bug described in SERVER-40094.
 * @tags: [requires_sharding, uses_change_streams]
 */
(function() {
    "use strict";

    // Asserts that the expected operation type and documentKey are found on the change stream
    // cursor. Returns the change stream document.
    function assertWriteVisible({cursor, opType, docKey}) {
        assert.soon(() => cursor.hasNext());
        const changeDoc = cursor.next();
        assert.eq(opType, changeDoc.operationType, changeDoc);
        assert.eq(docKey, changeDoc.documentKey, changeDoc);
        return changeDoc;
    }

    // Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control
    // over the ordering of events across the cluster.
    const st = new ShardingTest({
        shards: 2,
        rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}}
    });

    // Create two databases. We will place one of these on each shard.
    const mongosDB0 = st.s.getDB(`${jsTestName()}_0`);
    const mongosDB1 = st.s.getDB(`${jsTestName()}_1`);
    const adminDB = st.s.getDB("admin");

    // Enable sharding on mongosDB0 and ensure its primary is shard0.
    assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()}));
    st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL());

    // Enable sharding on mongosDB1 and ensure its primary is shard1.
    assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()}));
    st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL());

    // Open a connection to a different collection on each shard. We use direct connections to
    // ensure that the oplog timestamps across the shards overlap.
    const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`);
    const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`);

    // Open a change stream on the test cluster. We will capture events in 'changeList'.
    const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
    const changeList = [];

    // Insert ten documents on each shard, alternating between the two collections.
    for (let i = 0; i < 20; ++i) {
        const coll = (i % 2 ? coll1 : coll0);
        assert.commandWorked(coll.insert({shard: (i % 2)}));
    }

    // Verify that each shard now has ten total documents present in the associated collection.
    assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10);
    assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10);

    // Read the stream of events, capture them in 'changeList', and confirm that all events occurred
    // at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that
    // corresponding events occurred at the same clusterTime on both shards; we expect, however,
    // that this will be true in the vast majority of runs, and so there is value in testing.
    for (let i = 0; i < 19; ++i) {
        assert.soon(() => changeStreamCursor.hasNext());
        changeList.push(changeStreamCursor.next());
    }
    const clusterTime = changeList[0].clusterTime;
    for (let event of changeList) {
        assert.gte(event.clusterTime, clusterTime);
    }

    // Test that resuming from each event returns the expected set of subsequent documents.
    for (let i = 0; i < changeList.length; ++i) {
        const resumeCursor = adminDB.aggregate(
            [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]);

        // Confirm that the first event in the resumed stream matches the next event recorded in
        // 'changeList' from the original stream. The order of the events should be stable across
        // resumes from any point.
        for (let x = (i + 1); x < changeList.length; ++x) {
            const expectedChangeDoc = changeList[x];
            assertWriteVisible({
                cursor: resumeCursor,
                opType: expectedChangeDoc.operationType,
                docKey: expectedChangeDoc.documentKey
            });
        }
        resumeCursor.close();
    }

    st.stop();
})();