summaryrefslogtreecommitdiff
path: root/jstests/serverless/initial_sync_change_collection.js
blob: 3d111307dd96e0e1d581e95a868c30f074e80468 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Tests that the data cloning phase of initial sync does not clone the change collection documents
// and when the initial sync has completed the change collection and oplog entries are exactly same
// in the new secondary.
// @tags: [
//   requires_fcv_62,
//   __TEMPORARILY_DISABLED__
// ]
//
(function() {
"use strict";

load("jstests/libs/fail_point_util.js");                    // For waitForFailPoint.
load("jstests/serverless/libs/change_collection_util.js");  // For verifyChangeCollectionEntries.

const replSetTest = new ReplSetTest({nodes: 1});

// TODO SERVER-67267 Add 'serverless' flag.
// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__'
// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'.
replSetTest.startSet({
    setParameter: {
        featureFlagServerlessChangeStreams: true,
        multitenancySupport: true,
        featureFlagMongoStore: true
    }
});

replSetTest.initiate();

const primary = replSetTest.getPrimary();

// Enable the change stream to create the change collection.
assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));

const primaryChangeColl = primary.getDB("config").system.change_collection;

const mdbStockPriceDoc = {
    _id: "mdb",
    price: 250
};

// The document 'mdbStockPriceDoc' is inserted before starting the initial sync. As such the
// document 'mdbStockPriceDoc' should not be cloned in the secondary after initial sync is complete.
assert.commandWorked(primary.getDB("test").stockPrice.insert(mdbStockPriceDoc));
assert.eq(primaryChangeColl.find({o: mdbStockPriceDoc}).toArray().length, 1);

// Add a new secondary to the replica set and block the initial sync after the data cloning is done.
const secondary = replSetTest.add({
    setParameter: {
        // Hang after the data cloning phase is completed.
        "failpoint.initialSyncHangAfterDataCloning": tojson({mode: "alwaysOn"}),
        featureFlagServerlessChangeStreams: true,
        multitenancySupport: true,
        featureFlagMongoStore: true
    }
});

replSetTest.reInitiate();

// Wait for the cloning phase to complete. The cloning phase should not clone documents of the
// change collection from the primary.
assert.commandWorked(secondary.adminCommand({
    waitForFailPoint: "initialSyncHangAfterDataCloning",
    timesEntered: 1,
    maxTimeMS: kDefaultWaitForFailPointTimeout
}));

const tslaStockPriceDoc = {
    _id: "tsla",
    price: 650
};

// The document 'tslaStockPriceDoc' is inserted in the primary after the data cloning phase has
// completed, as such this should be inserted in the secondary's change change collection.
assert.commandWorked(primary.getDB("test").stockPrice.insert(tslaStockPriceDoc));
assert.eq(primaryChangeColl.find({o: tslaStockPriceDoc}).toArray().length, 1);

// Unblock the initial sync process.
assert.commandWorked(secondary.getDB("test").adminCommand(
    {configureFailPoint: "initialSyncHangAfterDataCloning", mode: "off"}));

// Wait for the initial sync to complete.
replSetTest.waitForState(secondary, ReplSetTest.State.SECONDARY);

// Verify that the document 'mdbStockPriceDoc' does not exist and the document 'tslaStockPriceDoc'
// exists in the secondary's change collection.
const changeCollDocs =
    secondary.getDB("config")
        .system.change_collection.find({$or: [{o: mdbStockPriceDoc}, {o: tslaStockPriceDoc}]})
        .toArray();
assert.eq(changeCollDocs.length, 1);
assert.eq(changeCollDocs[0].o, tslaStockPriceDoc);

// Get the timestamp of the first and the last entry from the secondary's oplog.
const oplogDocs = secondary.getDB("local").oplog.rs.find().toArray();
assert.gt(oplogDocs.length, 0);
const startOplogTimestamp = oplogDocs[0].ts;
const endOplogTimestamp = oplogDocs.at(-1).ts;

// The change collection gets created at the data cloning phase and documents are written to the
// oplog only after the data cloning is done. And so, the change collection already exists in place
// to capture all oplog entries. As such, the change collection entries and the oplog entries from
// the 'startOplogTimestamp' to the 'endOplogTimestamp' must be exactly the same.
verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);

// The state of the change collection after the initial sync is not consistent with the primary.
// This is because the change collection's data is never cloned to the secondary, only it's creation
// is cloned. As such, we will skip the db hash check on the change collection.
replSetTest.stopSet(undefined /* signal */, undefined /* forRestart */, {skipCheckDBHashes: true});
})();