1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
/**
* Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume
* token with each document and returning the postBatchResumeToken as soon as each batch is
* exhausted.
* @tags: [
* requires_replication,
* requires_majority_read_concern,
* ]
*/
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
// Create a new single-node replica set, and ensure that it can support $changeStream.
const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
const db = rst.getPrimary().getDB(jsTestName());
const collName = "change_stream_shell_helper_resume_token";
const csCollection = assertDropAndRecreateCollection(db, collName);
const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
const batchSize = 5;
let docId = 0;
// Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received.
const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}});
assert(!csCursor.hasNext());
let curResumeToken = csCursor.getResumeToken();
assert.neq(undefined, curResumeToken);
// Test that advancing the oplog time updates the postBatchResumeToken, even with no results.
assert.commandWorked(otherCollection.insert({}));
let prevResumeToken = curResumeToken;
assert.soon(() => {
assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
assert.neq(undefined, curResumeToken);
return bsonWoCompare(curResumeToken, prevResumeToken) > 0;
});
// Insert 9 documents into the collection, followed by a write to the unrelated collection.
for (let i = 0; i < 9; ++i) {
assert.commandWorked(csCollection.insert({_id: ++docId}));
}
assert.commandWorked(otherCollection.insert({}));
// Retrieve the first batch of events from the cursor.
assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
// We have not yet iterated any of the events. Verify that the resume token is unchanged.
assert.docEq(curResumeToken, csCursor.getResumeToken());
// For each event in the first batch, the resume token should match the document's _id.
let currentDoc = null;
while (csCursor.objsLeftInBatch()) {
currentDoc = csCursor.next();
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
assert.docEq(curResumeToken, currentDoc._id);
assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
}
// Retrieve the second batch of events from the cursor.
assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
// We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged.
assert.docEq(curResumeToken, csCursor.getResumeToken());
// For all but the final event, the resume token should match the document's _id.
while ((currentDoc = csCursor.next()).fullDocument._id < docId) {
assert.soon(() => csCursor.hasNext());
prevResumeToken = curResumeToken;
curResumeToken = csCursor.getResumeToken();
assert.docEq(curResumeToken, currentDoc._id);
assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
}
// When we reach here, 'currentDoc' is the final document in the batch, but we have not yet
// updated the resume token. Assert that this resume token sorts before currentDoc's.
prevResumeToken = curResumeToken;
assert.gt(bsonWoCompare(currentDoc._id, prevResumeToken), 0);
// After we have pulled the final document out of the cursor, the resume token should be the
// postBatchResumeToken rather than the document's _id. Because we inserted an item into the
// unrelated collection to push the oplog past the final event returned by the change stream,
// this will be strictly greater than the final document's _id.
assert.soon(() => {
curResumeToken = csCursor.getResumeToken();
assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
return bsonWoCompare(curResumeToken, currentDoc._id) > 0;
});
rst.stopSet();
}());
|