summaryrefslogtreecommitdiff
path: root/jstests/serverless/write_to_change_collection_in_startup_recovery.js
blob: bbf7cd0a259d616762021ba2268729671a67fe5c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Tests that replaying the oplog entries during the startup recovery also writes to the change
// collection.
// @tags: [
//   requires_fcv_62,
//   __TEMPORARILY_DISABLED__
// ]

(function() {
"use strict";

load("jstests/libs/fail_point_util.js");                    // For configureFailPoint.
load("jstests/serverless/libs/change_collection_util.js");  // For verifyChangeCollectionEntries.

const replSetTest = new ReplSetTest({nodes: 1});

// TODO SERVER-67267 Add 'serverless' flag.
// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__'
// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'.
replSetTest.startSet({
    setParameter: {
        featureFlagServerlessChangeStreams: true,
        multitenancySupport: true,
        featureFlagMongoStore: true
    }
});

replSetTest.initiate();

let primary = replSetTest.getPrimary();

// Enable the change stream to create the change collection.
assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));

// Insert a document to the collection and then capture the corresponding oplog timestamp. This
// timestamp will be the start timestamp beyond (inclusive) which we will validate the oplog and the
// change collection entries.
const startTimestamp = assert
                           .commandWorked(primary.getDB("test").runCommand(
                               {insert: "seedCollection", documents: [{_id: "beginTs"}]}))
                           .operationTime;

// Pause the checkpointing, as such non-journaled collection including the change collection will
// not be persisted.
const pauseCheckpointThreadFailPoint = configureFailPoint(primary, "pauseCheckpointThread");
pauseCheckpointThreadFailPoint.wait();

// Insert a document to the collection.
assert.commandWorked(primary.getDB("test").stockPrice.insert({_id: "mdb", price: 250}));

// Verify that the inserted document can be queried from the 'stockPrice', the 'oplog.rs', and
// the 'system.change_collection'.
assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1);
assert.eq(primary.getDB("local")
              .oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
              .toArray()
              .length,
          1);
assert.eq(primary.getDB("config")
              .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
              .toArray()
              .length,
          1);

// Perform ungraceful shutdown of the primary node and do not clean the db path directory.
replSetTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true});

// Run a new mongoD instance with db path pointing to the replica set primary db directory.
const standalone =
    MongoRunner.runMongod({dbpath: primary.dbpath, noReplSet: true, noCleanData: true});
assert.neq(null, standalone, "Fail to restart the node as standalone");

// Verify that the inserted document does not exist both in the 'stockPrice' and
// the 'system.change_collection' but exists in the 'oplog.rs'.
assert.eq(standalone.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 0);
assert.eq(standalone.getDB("local")
              .oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
              .toArray()
              .length,
          1);
assert.eq(standalone.getDB("config")
              .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
              .toArray()
              .length,
          0);

// Stop the mongoD instance and do not clean the db directory.
MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: true, wait: true});

// Start the replica set primary with the same db path.
replSetTest.start(primary, {
    noCleanData: true,
    setParameter: {
        featureFlagServerlessChangeStreams: true,
        multitenancySupport: true,
        featureFlagMongoStore: true
    }
});

primary = replSetTest.getPrimary();

// Verify that the 'stockPrice' and the 'system.change_collection' now have the inserted document.
// This document was inserted by applying oplog entries during the startup recovery.
assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1);
assert.eq(primary.getDB("config")
              .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
              .toArray()
              .length,
          1);

// Get the oplog timestamp up to this point. All oplog entries upto this timestamp must exist in the
// change collection.
const endTimestamp = primary.getDB("local").oplog.rs.find().toArray().at(-1).ts;
assert(endTimestamp !== undefined);

// Verify that the oplog and the change collection entries between the ['startTimestamp',
// 'endTimestamp'] window are exactly same and in the same order.
// TODO SERVER-69115 Pass the tenant id to the 'verifyChangeCollectionEntries'.
verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp);

replSetTest.stopSet();
})();