summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_oplog_rollover.js
blob: 29bb2a15e420d02b391f3256b43397d0f292802b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Tests the behaviour of change streams on an oplog which rolls over.
// @tags: [
//   requires_find_command,
//   requires_journaling,
//   requires_majority_read_concern,
//   uses_change_streams,
// ]
(function() {
"use strict";

load('jstests/replsets/rslib.js');           // For getLatestOp, getFirstOplogEntry.
load('jstests/libs/change_stream_util.js');  // For ChangeStreamTest.

const oplogSize = 1;  // size in MB
const rst = new ReplSetTest({nodes: 1, oplogSize: oplogSize});

rst.startSet();
rst.initiate();

const testDB = rst.getPrimary().getDB(jsTestName());
const testColl = testDB[jsTestName()];

const cst = new ChangeStreamTest(testDB);

// Write a document to the test collection.
assert.commandWorked(testColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));

let changeStream = cst.startWatchingChanges(
    {pipeline: [{$changeStream: {}}], collection: testColl.getName(), includeToken: true});

// We awaited the replication of the insert, so the change stream shouldn't return them.
assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}}));

// Record current time to resume a change stream later in the test.
const resumeTimeFirstUpdate = testDB.runCommand({hello: 1}).$clusterTime.clusterTime;

assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}}));

// Test that we see the the update, and remember its resume tokens.
let next = cst.getOneChange(changeStream);
assert.eq(next.operationType, "update");
assert.eq(next.documentKey._id, 1);
const resumeTokenFromFirstUpdate = next._id;

// Write some additional documents, then test we can resume after the first update.
assert.commandWorked(testColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
assert.commandWorked(testColl.insert({_id: 3}, {writeConcern: {w: "majority"}}));

changeStream = cst.startWatchingChanges({
    pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
    aggregateOptions: {cursor: {batchSize: 0}},
    collection: testColl.getName()
});

for (let nextExpectedId of [2, 3]) {
    assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId);
}

// Test that the change stream can see additional inserts into the collection.
assert.commandWorked(testColl.insert({_id: 4}, {writeConcern: {w: "majority"}}));
assert.commandWorked(testColl.insert({_id: 5}, {writeConcern: {w: "majority"}}));

for (let nextExpectedId of [4, 5]) {
    assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId);
}

// Confirm that we can begin a stream at a timestamp that precedes the start of the oplog, if
// the first entry in the oplog is the replica set initialization message.
const firstOplogEntry = getFirstOplogEntry(rst.getPrimary());
assert.eq(firstOplogEntry.o.msg, "initiating set");
assert.eq(firstOplogEntry.op, "n");

const startAtDawnOfTimeStream = cst.startWatchingChanges({
    pipeline: [{$changeStream: {startAtOperationTime: Timestamp(1, 1)}}],
    aggregateOptions: {cursor: {batchSize: 0}},
    collection: testColl.getName()
});

// The first entry we see should be the initial insert into the collection.
const firstStreamEntry = cst.getOneChange(startAtDawnOfTimeStream);
assert.eq(firstStreamEntry.operationType, "insert");
assert.eq(firstStreamEntry.documentKey._id, 1);

// Test that the stream can't resume if the resume token is no longer present in the oplog.

// Roll over the entire oplog such that none of the events are still present.
const primaryNode = rst.getPrimary();
const mostRecentOplogEntry = getLatestOp(primaryNode);
assert.neq(mostRecentOplogEntry, null);
const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');

function oplogIsRolledOver() {
    // The oplog has rolled over if the op that used to be newest is now older than the
    // oplog's current oldest entry. Said another way, the oplog is rolled over when
    // everything in the oplog is newer than what used to be the newest entry.
    return bsonWoCompare(mostRecentOplogEntry.ts,
                         getFirstOplogEntry(primaryNode, {readConcern: "majority"}).ts) < 0;
}

while (!oplogIsRolledOver()) {
    assert.commandWorked(testColl.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
}

// Confirm that attempting to continue reading an existing change stream throws CappedPositionLost.
assert.commandFailedWithCode(
    testDB.runCommand({getMore: startAtDawnOfTimeStream.id, collection: testColl.getName()}),
    ErrorCodes.CappedPositionLost);

// Now confirm that attempting to resumeAfter or startAtOperationTime fails.
ChangeStreamTest.assertChangeStreamThrowsCode({
    db: testDB,
    collName: testColl.getName(),
    pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
    expectedCode: ErrorCodes.ChangeStreamHistoryLost
});

ChangeStreamTest.assertChangeStreamThrowsCode({
    db: testDB,
    collName: testColl.getName(),
    pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}],
    expectedCode: ErrorCodes.ChangeStreamHistoryLost
});

// We also can't start a stream from the "dawn of time" any more, since the first entry in the
// oplog is no longer the replica set initialization message.
ChangeStreamTest.assertChangeStreamThrowsCode({
    db: testDB,
    collName: testColl.getName(),
    pipeline: [{$changeStream: {startAtOperationTime: Timestamp(1, 1)}}],
    expectedCode: ErrorCodes.ChangeStreamHistoryLost
});

cst.cleanUp();
rst.stopSet();
})();