summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_resume_token_applyops_overlap.js
blob: 4f0be761570e966912d805ee8943d138b15f83c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
 * Confirms that resuming from an event which has the same clusterTime as a transaction on another
 * shard does not cause the resume attempt to be prematurely rejected. Reproduction script for the
 * bug described in SERVER-40094.
 * @tags: [
 *   requires_sharding,
 *   uses_multi_shard_transaction,
 *   uses_transactions,
 * ]
 */
(function() {
"use strict";

// Asserts that the expected operation type and documentKey are found on the change stream
// cursor. Returns the change stream document.
function assertWriteVisible({cursor, opType, docKey}) {
    assert.soon(() => cursor.hasNext());
    const changeDoc = cursor.next();
    assert.eq(opType, changeDoc.operationType, changeDoc);
    assert.eq(docKey, changeDoc.documentKey, changeDoc);
    return changeDoc;
}

// Create a new cluster with 2 shards. Enable 1-second period no-ops to ensure that all relevant
// events eventually become available.
const st = new ShardingTest({
    shards: 2,
    rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
});

const mongosDB = st.s.getDB(jsTestName());
const mongosColl = mongosDB.test;

// Enable sharding on the test DB and ensure its primary is shard0.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

// Shard on {shard:1}, split at {shard:1}, and move the upper chunk to shard1.
st.shardColl(mongosColl, {shard: 1}, {shard: 1}, {shard: 1}, mongosDB.getName(), true);

// Seed each shard with one document.
assert.commandWorked(
    mongosColl.insert([{shard: 0, _id: "initial_doc"}, {shard: 1, _id: "initial doc"}]));

// Start a transaction which will be used to write documents across both shards.
const session = mongosDB.getMongo().startSession();
const sessionDB = session.getDatabase(mongosDB.getName());
const sessionColl = sessionDB[mongosColl.getName()];
session.startTransaction({readConcern: {level: "majority"}});

// Open a change stream on the test collection. We will capture events in 'changeList'.
const changeStreamCursor = mongosColl.watch();
const changeList = [];

// Insert four documents on each shard under the transaction.
assert.commandWorked(
    sessionColl.insert([{shard: 0, _id: "txn1-doc-0"}, {shard: 1, _id: "txn1-doc-1"}]));
assert.commandWorked(
    sessionColl.insert([{shard: 0, _id: "txn1-doc-2"}, {shard: 1, _id: "txn1-doc-3"}]));
assert.commandWorked(
    sessionColl.insert([{shard: 0, _id: "txn1-doc-4"}, {shard: 1, _id: "txn1-doc-5"}]));
assert.commandWorked(
    sessionColl.insert([{shard: 0, _id: "txn1-doc-6"}, {shard: 1, _id: "txn1-doc-7"}]));

// Commit the transaction.
assert.commandWorked(session.commitTransaction_forTesting());

// Read the stream of events, capture them in 'changeList', and confirm that all events occurred
// at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that
// all events occurred at the same clusterTime on both shards, even in the case where all events
// occur within a single transaction. We expect, however, that this will be true in the vast
// majority of test runs, and so there is value in retaining this test.
for (let i = 0; i < 8; ++i) {
    assert.soon(() => changeStreamCursor.hasNext());
    changeList.push(changeStreamCursor.next());
}
const clusterTime = changeList[0].clusterTime;
for (let event of changeList) {
    assert.gte(event.clusterTime, clusterTime);
}

// Test that resuming from each event returns the expected set of subsequent documents.
for (let i = 0; i < changeList.length; ++i) {
    const resumeCursor = mongosColl.watch([], {startAfter: changeList[i]._id});

    // Confirm that the first event in the resumed stream matches the next event recorded in
    // 'changeList' from the original stream. The order of the events should be stable across
    // resumes from any point.
    for (let x = (i + 1); x < changeList.length; ++x) {
        const expectedChangeDoc = changeList[x];
        assertWriteVisible({
            cursor: resumeCursor,
            opType: expectedChangeDoc.operationType,
            docKey: expectedChangeDoc.documentKey
        });
    }
    assert(!resumeCursor.hasNext(), () => `Unexpected event: ${tojson(resumeCursor.next())}`);
    resumeCursor.close();
}

st.stop();
})();