summaryrefslogtreecommitdiff
path: root/jstests/change_streams/start_at_cluster_time.js
blob: 77d85514016246690093db05290ace7523f8bcda (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Tests resuming change streams based on cluster time.
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.

const coll = assertDropAndRecreateCollection(db, jsTestName());

const testStartTime = db.runCommand({hello: 1}).$clusterTime.clusterTime;

// Write a document to each chunk, and wait for replication.
assert.commandWorked(coll.insert({_id: -1}, {writeConcern: {w: "majority"}}));
assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));

// Perform two updates, then use a change stream to capture the cluster time of the first update
// to be resumed from.
const streamToFindClusterTime = coll.watch();
assert.commandWorked(coll.update({_id: -1}, {$set: {updated: true}}));
assert.commandWorked(coll.update({_id: 1}, {$set: {updated: true}}));
assert.soon(() => streamToFindClusterTime.hasNext());
let next = streamToFindClusterTime.next();
assert.eq(next.operationType, "update");
assert.eq(next.documentKey, {_id: -1});
const timeOfFirstUpdate = next.clusterTime;

let changeStream = coll.watch([], {startAtOperationTime: timeOfFirstUpdate});

// Test that starting at the cluster time is inclusive of the first update, so we should see
// both updates in the new stream.
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, -1, tojson(next));

assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));

// Test that startAtOperationTime is not allowed alongside resumeAfter.
assert.commandFailedWithCode(db.runCommand({
    aggregate: coll.getName(),
    pipeline: [{$changeStream: {startAtOperationTime: timeOfFirstUpdate, resumeAfter: next._id}}],
    cursor: {}
}),
                             40674);

// Test that resuming from a time in the future will wait for that time to come.
let resumeTimeFarFuture = db.runCommand({hello: 1}).$clusterTime.clusterTime;
resumeTimeFarFuture =
    new Timestamp(resumeTimeFarFuture.getTime() + 60 * 60 * 6, 1);  // 6 hours in the future

let changeStreamFuture = coll.watch([], {startAtOperationTime: resumeTimeFarFuture});

// Resume the change stream from the start of the test and verify it picks up the changes to the
// collection. Namely, it should see two inserts followed by two updates.
changeStream = coll.watch([], {startAtOperationTime: testStartTime});
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert", tojson(next));
assert.eq(next.documentKey._id, -1, tojson(next));

assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "insert", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));

assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, -1, tojson(next));

assert.soon(() => changeStream.hasNext());
next = changeStream.next();
assert.eq(next.operationType, "update", tojson(next));
assert.eq(next.documentKey._id, 1, tojson(next));

// Verify that the change stream resumed from far into the future does not see any changes.
assert(!changeStreamFuture.hasNext());
})();