1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
/**
* Verifies change streams operation for batched writes.
*
* @tags: [
* requires_fcv_61,
* requires_replication,
* requires_majority_read_concern,
* uses_change_streams,
* ]
*/
(function() {
"use strict";
const dbName = "test";
const collName = "c";
/**
* Asserts that the expected operation type and documentKey are found on the change stream
* cursor. Returns the change stream document.
*/
function assertWriteVisible(cursor, operationType, documentKey) {
assert.soon(() => cursor.hasNext());
const changeDoc = cursor.next();
assert.eq(operationType, changeDoc.operationType, changeDoc);
assert.eq(documentKey, changeDoc.documentKey, changeDoc);
// Change stream events for batched writes do not include lsid and txnNumber.
assert(!changeDoc.hasOwnProperty('lsid'));
assert(!changeDoc.hasOwnProperty('txnNumber'));
return changeDoc;
}
/**
* Asserts that the expected operation type and documentKey are found on the change stream
* cursor. Pushes the corresponding resume token and change stream document to an array.
*/
function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) {
const changeDoc = assertWriteVisible(cursor, operationType, documentKey);
changeList.push(changeDoc);
}
/**
* Asserts that there are no changes waiting on the change stream cursor.
*/
function assertNoChanges(cursor) {
assert(!cursor.hasNext(), () => {
return "Unexpected change set: " + tojson(cursor.toArray());
});
}
function runTest(conn) {
const db = conn.getDB(dbName);
const coll = db.getCollection(collName);
const docsPerBatch = 3;
const totalNumDocs = 8;
let changeList = [];
// For consistent results, disable any batch targeting except for
// 'batchedDeletesTargetBatchDocs'.
assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetStagedDocBytes: 0}));
assert.commandWorked(
db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));
// Populate the collection, then open a change stream, then mass-delete the collection.
assert.commandWorked(coll.insertMany(
[...Array(totalNumDocs).keys()].map(x => ({_id: x, txt: "a" + x})), {ordered: false}));
const changeStreamCursor = coll.watch();
const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches'];
const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs'];
assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
assert.eq(0, coll.find().itcount());
const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches'];
const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs'];
assert.eq(serverStatusBatchesAfter,
serverStatusBatchesBefore + Math.ceil(totalNumDocs / docsPerBatch));
assert.eq(serverStatusDocsAfter, serverStatusDocsBefore + totalNumDocs);
// Verify the change stream emits events for the batched deletion, and capture the events so we
// can test resumability later.
for (let docKey = 0; docKey < totalNumDocs; docKey++) {
assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: docKey}, changeList);
}
assertNoChanges(changeStreamCursor);
changeStreamCursor.close();
// Test that change stream resume returns the expected set of documents at each point
// captured by this test.
for (let i = 0; i < changeList.length; ++i) {
const resumeCursor = coll.watch([], {startAfter: changeList[i]._id});
for (let x = (i + 1); x < changeList.length; ++x) {
const expectedChangeDoc = changeList[x];
assertWriteVisible(
resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey);
}
assertNoChanges(resumeCursor);
resumeCursor.close();
}
assert.commandWorked(db.dropDatabase());
}
const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
runTest(rst.getPrimary());
rst.stopSet();
})();
|