summaryrefslogtreecommitdiff
path: root/jstests/replsets/change_stream_pit_pre_image_deletion_asymmetric.js
blob: d014b3e0981756f6623119376f36a04ef607d4f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
 * Tests change stream point-in-time pre-images deletion replication to secondaries when primary
 * node state is not the same as of the secondary - the pre-image document to be deleted exists on
 * the primary node but does not exist on the secondary.
 *
 * @tags: [
 * requires_fcv_60,
 * ]
 */
(function() {
"use strict";

load("jstests/libs/change_stream_util.js");  // For getPreImages().
load("jstests/libs/fail_point_util.js");
load('jstests/replsets/rslib.js');  // For getLatestOp, getFirstOplogEntry.

const oplogSizeMB = 1;
const replTest = new ReplSetTest({
    name: jsTestName(),
    nodes: [{}, {rsConfig: {priority: 0}}],
    nodeOptions: {
        setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})},
        oplogSize: oplogSizeMB
    }
});
replTest.startSet();
replTest.initiate();
const primaryNode = replTest.getPrimary();

const collectionName = "coll";
const testDB = primaryNode.getDB(jsTestName());

// Create a collection with change stream pre- and post-images enabled.
assert.commandWorked(
    testDB.createCollection(collectionName, {changeStreamPreAndPostImages: {enabled: true}}));
const coll = testDB[collectionName];

// Insert a document for the test.
assert.commandWorked(coll.insert({_id: 1, v: 1}, {writeConcern: {w: 2}}));

// Add a new node that will perform an initial sync. Pause the initial sync process (using
// failpoint "initialSyncHangBeforeCopyingDatabases") before copying the database to perform
// document modifications to make the collection content more recent and create inconsistent
// data situation during oplog application.
const initialSyncNode = replTest.add({
    rsConfig: {priority: 0},
    setParameter: {'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'})}
});

// Wait until the new node starts and pauses on the fail point.
replTest.reInitiate();
assert.commandWorked(initialSyncNode.adminCommand({
    waitForFailPoint: "initialSyncHangBeforeCopyingDatabases",
    timesEntered: 1,
    maxTimeMS: 60000
}));

// Update the document on the primary node.
assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}}));

// Resume the initial sync process.
assert.commandWorked(initialSyncNode.adminCommand(
    {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}));

// Wait until the initial sync process is complete and the new node becomes a fully
// functioning secondary.
replTest.waitForState(initialSyncNode, ReplSetTest.State.SECONDARY);

// Verify that pre-images were not written during the logical initial sync. At this point the
// pre-image collections in the nodes of the replica set are out of sync.
let preImageDocuments = getPreImages(initialSyncNode);
assert.eq(preImageDocuments.length, 0, preImageDocuments);

// Force deletion of all pre-images and ensure that this replicates to all nodes.
// Roll over all current oplog entries.
const lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
assert.neq(lastOplogEntryToBeRemoved, null);
const largeString = 'a'.repeat(256 * 1024);
const otherColl = primaryNode.getDB(jsTestName())["otherCollection"];

// Checks if the oplog has been rolled over from the timestamp of
// 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
// than 'lastOplogEntryToBeRemoved'.
function oplogIsRolledOver() {
    return timestampCmp(lastOplogEntryToBeRemoved.ts,
                        getFirstOplogEntry(primaryNode, {readConcern: "majority"}).ts) <= 0;
}

while (!oplogIsRolledOver()) {
    // Insert a large document with a write concern that ensures that before proceeding the
    // operation gets replicated to all 3 nodes in the replica set, since, otherwise, the node that
    // is being initial synced may not be able to catchup due to a small size of the oplog.
    assert.commandWorked(otherColl.insert({long_str: largeString}, {writeConcern: {w: 3}}));
}

// Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' job deletes the expired pre-images
// (all).
assert.soon(() => {
    const preImages = getPreImages(primaryNode);
    return preImages.length == 0;
});

// Verify that all nodes get in sync and do not crash.
replTest.awaitReplication();
replTest.stopSet();
})();