1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
// Tests resuming change streams based on cluster time.
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
const coll = assertDropAndRecreateCollection(db, jsTestName());
const testStartTime = db.runCommand({hello: 1}).$clusterTime.clusterTime;
// Write a document to each chunk, and wait for replication.
assert.commandWorked(coll.insert({_id: -1}, {writeConcern: {w: "majority"}}));
assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
// Perform two updates, then use a change stream to capture the cluster time of the first update
// to be resumed from.
const streamToFindClusterTime = coll.watch();
assert.commandWorked(coll.update({_id: -1}, {$set: {updated: true}}));
assert.commandWorked(coll.update({_id: 1}, {$set: {updated: true}}));
assert.soon(() => streamToFindClusterTime.hasNext());
let next = streamToFindClusterTime.next();
assert.eq(next.operationType, "update");
assert.eq(next.documentKey, {_id: -1});
const timeOfFirstUpdate = next.clusterTime;
let changeStream = coll.watch([], {startAtOperationTime: timeOfFirstUpdate});
// Test that starting at the cluster time is inclusive of the first update, so we should see
// both updates in the new stream.
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, -1, tojson(next));
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));
// Test that startAtOperationTime is not allowed alongside resumeAfter.
assert.commandFailedWithCode(db.runCommand({
aggregate: coll.getName(),
pipeline: [{$changeStream: {startAtOperationTime: timeOfFirstUpdate, resumeAfter: next._id}}],
cursor: {}
}),
40674);
// Test that resuming from a time in the future will wait for that time to come.
let resumeTimeFarFuture = db.runCommand({hello: 1}).$clusterTime.clusterTime;
resumeTimeFarFuture =
new Timestamp(resumeTimeFarFuture.getTime() + 60 * 60 * 6, 1); // 6 hours in the future
let changeStreamFuture = coll.watch([], {startAtOperationTime: resumeTimeFarFuture});
// Resume the change stream from the start of the test and verify it picks up the changes to the
// collection. Namely, it should see two inserts followed by two updates.
changeStream = coll.watch([], {startAtOperationTime: testStartTime});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert", tojson(next));
assert.eq(next.documentKey._id, -1, tojson(next));
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, -1, tojson(next));
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));
// Verify that the change stream resumed from far into the future does not see any changes.
assert(!changeStreamFuture.hasNext());
})();
|