summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_pre_image_removal_job.js
blob: 98282febf4b789daee7b0ccbd2d36fbc98438913 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Tests that expired pre-images (pre-image timestamp older than oldest oplog entry timestamp) are
// removed from the pre-images collection via the 'PeriodicChangeStreamExpiredPreImagesRemover'
// periodic job.
// @tags: [
//  requires_fcv_60,
//  featureFlagChangeStreamPreAndPostImages,
//  assumes_against_mongod_not_mongos,
//  change_stream_does_not_expect_txns,
//  requires_replication,
//  requires_majority_read_concern,
// ]
(function() {
"use strict";

load('jstests/replsets/rslib.js');                 // For getLatestOp, getFirstOplogEntry.
load("jstests/libs/collection_drop_recreate.js");  // For assertCreateCollection.

const docA = {
    _id: 12345,
    version: 1,
};
const docB = {
    _id: 12345,
    version: 2,
};
const docC = {
    _id: 12345,
    version: 3,
};
const preImagesCollectionDatabase = "config";
const preImagesCollectionName = "system.preimages";
const oplogSizeMB = 1;

// Returns documents from the pre-images collection from 'node'.
function getPreImages(node) {
    return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName].find().toArray();
}

// Set up the replica set with two nodes and two collections with 'changeStreamPreAndPostImages'
// enabled and run expired pre-image removal job every second.
const rst = new ReplSetTest({nodes: 2, oplogSize: oplogSizeMB});
rst.startSet({setParameter: {expiredChangeStreamPreImageRemovalJobSleepSecs: 1}});
rst.initiate();
const primaryNode = rst.getPrimary();
const testDB = primaryNode.getDB(jsTestName());
const localDB = primaryNode.getDB("local");
const collA =
    assertCreateCollection(testDB, "collA", {changeStreamPreAndPostImages: {enabled: true}});
const collB =
    assertCreateCollection(testDB, "collB", {changeStreamPreAndPostImages: {enabled: true}});

// Pre-images collection must be empty.
let preImages = getPreImages(primaryNode);
assert.eq(preImages.length, 0, preImages);

// Perform insert and update operations.
for (const coll of [collA, collB]) {
    assert.commandWorked(coll.insert(docA, {writeConcern: {w: "majority"}}));
    assert.commandWorked(coll.update(docA, {$inc: {version: 1}}));
    assert.commandWorked(coll.update(docB, {$inc: {version: 1}}));
}

// Pre-images collection should contain four pre-images.
preImages = getPreImages(primaryNode);
const preImagesToExpire = 4;
assert.eq(preImages.length, preImagesToExpire, preImages);

// Roll over all current oplog entries.
const lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
assert.neq(lastOplogEntryToBeRemoved, null);
const largeStr = 'abcdefghi'.repeat(4 * 1024);

// Checks if the oplog has been rolled over from the timestamp of
// 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
// than the timestamp of the 'lastOplogEntryToBeRemoved' on each node of the replica set.
function oplogIsRolledOver() {
    return [primaryNode, rst.getSecondary()].every(
        (node) => timestampCmp(lastOplogEntryToBeRemoved.ts,
                               getFirstOplogEntry(node, {readConcern: "majority"}).ts) <= 0);
}

while (!oplogIsRolledOver()) {
    assert.commandWorked(collA.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
}

// Perform update operations that insert new pre-images that are not expired yet.
for (const coll of [collA, collB]) {
    assert.commandWorked(coll.update(docC, {$inc: {version: 1}}));
}

// Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job will delete the expired
// pre-images.
assert.soon(() => {
    // Only two pre-images should still be there, as their timestamp is greater than the oldest
    // oplog entry timestamp.
    preImages = getPreImages(primaryNode);
    const onlyTwoPreImagesLeft = preImages.length == 2;
    const allPreImagesHaveBiggerTimestamp = preImages.every(
        preImage => timestampCmp(preImage._id.ts, lastOplogEntryToBeRemoved.ts) == 1);
    return onlyTwoPreImagesLeft && allPreImagesHaveBiggerTimestamp;
});

// Because the pre-images collection is implicitly replicated, validate that writes do not generate
// oplog entries, with the exception of deletions.
const preimagesNs = 'config.system.preimages';
assert.eq(preImagesToExpire, localDB.oplog.rs.find({op: 'd', ns: preimagesNs}).itcount());
assert.eq(0, localDB.oplog.rs.find({op: {'$ne': 'd'}, ns: preimagesNs}).itcount());

// Verify that pre-images collection content on the primary node is the same as on the
// secondary.
rst.awaitReplication();
assert(bsonWoCompare(getPreImages(primaryNode), getPreImages(rst.getSecondary())) === 0);

// Increase oplog size on each node to prevent oplog entries from being deleted which removes a
// risk of replica set consistency check failure during tear down of the replica set.
const largeOplogSizeMB = 1000;
rst.nodes.forEach((node) => {
    assert.commandWorked(node.adminCommand({replSetResizeOplog: 1, size: largeOplogSizeMB}));
});

rst.stopSet();
}());