summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_pre_image_removal_job.js
blob: ad1c1ef4d67a7664c75419d75302a88046fec852 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// Tests that expired pre-images (pre-image timestamp older than oldest oplog entry timestamp) are
// removed from the pre-images collection via the 'PeriodicChangeStreamExpiredPreImagesRemover'
// periodic job.
// @tags: [
//  requires_fcv_60,
//  assumes_against_mongod_not_mongos,
//  change_stream_does_not_expect_txns,
//  requires_replication,
//  requires_majority_read_concern,
// ]
(function() {
"use strict";

load('jstests/replsets/rslib.js');                 // For getLatestOp, getFirstOplogEntry.
load("jstests/libs/collection_drop_recreate.js");  // For assertDropAndRecreateCollection.
load("jstests/libs/feature_flag_util.js");         // For FeatureFlagUtil.

const docA = {
    _id: 12345,
    version: 1,
};
const docB = {
    _id: 12345,
    version: 2,
};
const docC = {
    _id: 12345,
    version: 3,
};
const preImagesCollectionDatabase = "config";
const preImagesCollectionName = "system.preimages";
const oplogSizeMB = 1;

// Set up the replica set with two nodes and two collections with 'changeStreamPreAndPostImages'
// enabled and run expired pre-image removal job every second.
const rst = new ReplSetTest({nodes: 2, oplogSize: oplogSizeMB});
rst.startSet({
    setParameter: {
        expiredChangeStreamPreImageRemovalJobSleepSecs: 1,
        preImagesCollectionTruncateMarkersMinBytes: 1
    }
});
rst.initiate();
const largeStr = 'abcdefghi'.repeat(4 * 1024);
const primaryNode = rst.getPrimary();
const testDB = primaryNode.getDB(jsTestName());
const localDB = primaryNode.getDB("local");

// Returns documents from the pre-images collection from 'node'.
function getPreImages(node) {
    return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName].find().toArray();
}

// Checks if the oplog has been rolled over from the timestamp of
// 'lastOplogEntryTsToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
// than the 'lastOplogEntryTsToBeRemoved' on each node of the replica set.
function oplogIsRolledOver(lastOplogEntryTsToBeRemoved) {
    return [primaryNode, rst.getSecondary()].every(
        (node) => timestampCmp(lastOplogEntryTsToBeRemoved,
                               getFirstOplogEntry(node, {readConcern: "majority"}).ts) <= 0);
}

// Invokes function 'func()' and returns the invocation result. Retries the action if 'func()'
// throws an exception with error code CappedPositionLost until a timeout - default timeout of
// 'assert.soon()'. 'message' is returned in case of timeout.
function retryOnCappedPositionLostError(func, message) {
    let result;
    assert.soon(() => {
        try {
            result = func();
            return true;
        } catch (e) {
            if (e.code !== ErrorCodes.CappedPositionLost) {
                throw e;
            }
            jsTestLog(`Retrying on CappedPositionLost error: ${tojson(e)}`);
            return false;
        }
    }, message);
    return result;
}

// Tests that the pre-image removal job deletes only the expired pre-images by performing four
// updates leading to four pre-images being recorded, then the oplog is rolled over, removing the
// oplog entries of the previously recorded pre-images. Afterwards two updates are performed and
// therefore two new pre-images are recorded. The pre-images removal job must remove only the first
// four recorded pre-images.
{
    // Roll over the oplog, leading to 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job
    // deleting all pre-images.
    let lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
    while (!oplogIsRolledOver(lastOplogEntryToBeRemoved.ts)) {
        assert.commandWorked(
            testDB.tmp.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
    }
    assert.soon(() => getPreImages(primaryNode).length == 0);

    // Drop and recreate the collections with pre-images recording.
    const collA = assertDropAndRecreateCollection(
        testDB, "collA", {changeStreamPreAndPostImages: {enabled: true}});
    const collB = assertDropAndRecreateCollection(
        testDB, "collB", {changeStreamPreAndPostImages: {enabled: true}});

    // Perform insert and update operations.
    for (const coll of [collA, collB]) {
        assert.commandWorked(coll.insert(docA, {writeConcern: {w: "majority"}}));
        assert.commandWorked(coll.update(docA, {$inc: {version: 1}}));
        assert.commandWorked(coll.update(docB, {$inc: {version: 1}}));
    }

    // Pre-images collection should contain four pre-images.
    let preImages = getPreImages(primaryNode);
    const preImagesToExpire = 4;
    assert.eq(preImages.length, preImagesToExpire, preImages);

    // Roll over all current oplog entries.
    lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
    assert.neq(lastOplogEntryToBeRemoved, null);

    // Checks if the oplog has been rolled over from the timestamp of
    // 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
    // than the timestamp of the 'lastOplogEntryToBeRemoved' on each node of the replica set.
    while (!oplogIsRolledOver(lastOplogEntryToBeRemoved.ts)) {
        assert.commandWorked(collA.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
    }

    // Perform update operations that insert new pre-images that are not expired yet.
    for (const coll of [collA, collB]) {
        assert.commandWorked(coll.update(docC, {$inc: {version: 1}}));
    }

    // Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job will delete the expired
    // pre-images.
    assert.soon(() => {
        // Only two pre-images should still be there, as their timestamp is greater than the oldest
        // oplog entry timestamp.
        preImages = getPreImages(primaryNode);
        const onlyTwoPreImagesLeft = preImages.length == 2;
        const allPreImagesHaveBiggerTimestamp = preImages.every(
            preImage => timestampCmp(preImage._id.ts, lastOplogEntryToBeRemoved.ts) == 1);
        return onlyTwoPreImagesLeft && allPreImagesHaveBiggerTimestamp;
    });

    // If the feature flag is on, then batched deletes will not be used for deletion. Additionally,
    // since truncates are not replicated, the number of pre-images on the primary may differ from
    // that of the secondary.
    if (!FeatureFlagUtil.isPresentAndEnabled(testDB, "UseUnreplicatedTruncatesForDeletions")) {
        // Because the pre-images collection is implicitly replicated, validate that writes do not
        // generate oplog entries, with the exception of deletions.
        const preimagesNs = 'config.system.preimages';
        // Multi-deletes are batched base on time before performing the deletion, therefore the
        // deleted pre-images can span through multiple applyOps oplog entries.
        //
        // As pre-images span two collections, the minimum number of batches is 2, as we perform
        // the range-deletion per collection. The maximum number of batches is 4 (one per single
        // pre-image removed).
        const expectedNumberOfBatchesRange = [2, 3, 4];
        const serverStatusBatches = testDB.serverStatus()['batchedDeletes']['batches'];
        const serverStatusDocs = testDB.serverStatus()['batchedDeletes']['docs'];
        assert.contains(serverStatusBatches, expectedNumberOfBatchesRange);
        assert.eq(serverStatusDocs, preImagesToExpire);
        assert.contains(
            retryOnCappedPositionLostError(
                () =>
                    localDB.oplog.rs
                        .find(
                            {ns: 'admin.$cmd', 'o.applyOps.op': 'd', 'o.applyOps.ns': preimagesNs})
                        .itcount(),
                "Failed to fetch oplog entries for pre-image deletes"),
            expectedNumberOfBatchesRange);
        assert.eq(0,
                  retryOnCappedPositionLostError(
                      () => localDB.oplog.rs.find({op: {'$ne': 'd'}, ns: preimagesNs}).itcount(),
                      "Failed to fetch all oplog entries except pre-image deletes"));

        // Verify that pre-images collection content on the primary node is the same as on the
        // secondary.
        rst.awaitReplication();
        assert(bsonWoCompare(getPreImages(primaryNode), getPreImages(rst.getSecondary())) === 0);
    }
}

// Increase oplog size on each node to prevent oplog entries from being deleted which removes a
// risk of replica set consistency check failure during tear down of the replica set.
const largeOplogSizeMB = 1000;
rst.nodes.forEach((node) => {
    assert.commandWorked(node.adminCommand({replSetResizeOplog: 1, size: largeOplogSizeMB}));
});

rst.stopSet();
}());