summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_unwind_batched_writes.js
blob: 0ee89a2b09bcf6fc5b7c21dde4a23154e85b323f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
 * Verifies change streams operation for batched writes.
 *
 * @tags: [
 *   requires_fcv_61,
 *   requires_replication,
 *   requires_majority_read_concern,
 *   uses_change_streams,
 * ]
 */
(function() {
"use strict";

const dbName = "test";
const collName = "c";

/**
 * Asserts that the expected operation type and documentKey are found on the change stream
 * cursor. Returns the change stream document.
 */
function assertWriteVisible(cursor, operationType, documentKey) {
    assert.soon(() => cursor.hasNext());
    const changeDoc = cursor.next();
    assert.eq(operationType, changeDoc.operationType, changeDoc);
    assert.eq(documentKey, changeDoc.documentKey, changeDoc);
    // Change stream events for batched writes do not include lsid and txnNumber.
    assert(!changeDoc.hasOwnProperty('lsid'));
    assert(!changeDoc.hasOwnProperty('txnNumber'));
    return changeDoc;
}

/**
 * Asserts that the expected operation type and documentKey are found on the change stream
 * cursor. Pushes the corresponding resume token and change stream document to an array.
 */
function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) {
    const changeDoc = assertWriteVisible(cursor, operationType, documentKey);
    changeList.push(changeDoc);
}

/**
 * Asserts that there are no changes waiting on the change stream cursor.
 */
function assertNoChanges(cursor) {
    assert(!cursor.hasNext(), () => {
        return "Unexpected change set: " + tojson(cursor.toArray());
    });
}

function runTest(conn) {
    const db = conn.getDB(dbName);
    const coll = db.getCollection(collName);

    const docsPerBatch = 3;
    const totalNumDocs = 8;
    let changeList = [];

    // For consistent results, disable any batch targeting except for
    // 'batchedDeletesTargetBatchDocs'.
    assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
    assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetStagedDocBytes: 0}));
    assert.commandWorked(
        db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));

    // Populate the collection, then open a change stream, then mass-delete the collection.
    assert.commandWorked(coll.insertMany(
        [...Array(totalNumDocs).keys()].map(x => ({_id: x, txt: "a" + x})), {ordered: false}));
    const changeStreamCursor = coll.watch();
    const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches'];
    const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs'];
    assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
    assert.eq(0, coll.find().itcount());
    const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches'];
    const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs'];
    assert.eq(serverStatusBatchesAfter,
              serverStatusBatchesBefore + Math.ceil(totalNumDocs / docsPerBatch));
    assert.eq(serverStatusDocsAfter, serverStatusDocsBefore + totalNumDocs);

    // Verify the change stream emits events for the batched deletion, and capture the events so we
    // can test resumability later.
    for (let docKey = 0; docKey < totalNumDocs; docKey++) {
        assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: docKey}, changeList);
    }

    assertNoChanges(changeStreamCursor);
    changeStreamCursor.close();

    // Test that change stream resume returns the expected set of documents at each point
    // captured by this test.
    for (let i = 0; i < changeList.length; ++i) {
        const resumeCursor = coll.watch([], {startAfter: changeList[i]._id});

        for (let x = (i + 1); x < changeList.length; ++x) {
            const expectedChangeDoc = changeList[x];
            assertWriteVisible(
                resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey);
        }

        assertNoChanges(resumeCursor);
        resumeCursor.close();
    }

    assert.commandWorked(db.dropDatabase());
}

const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();

runTest(rst.getPrimary());

rst.stopSet();
})();