1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
// Tests that replaying the oplog entries during the startup recovery also writes to the change
// collection.
// @tags: [
// requires_fcv_62,
// __TEMPORARILY_DISABLED__
// ]
(function() {
"use strict";
load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
const replSetTest = new ReplSetTest({nodes: 1});
// TODO SERVER-67267 Add 'serverless' flag.
// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__'
// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'.
replSetTest.startSet({
setParameter: {
featureFlagServerlessChangeStreams: true,
multitenancySupport: true,
featureFlagMongoStore: true
}
});
replSetTest.initiate();
let primary = replSetTest.getPrimary();
// Enable the change stream to create the change collection.
assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
// Insert a document to the collection and then capture the corresponding oplog timestamp. This
// timestamp will be the start timestamp beyond (inclusive) which we will validate the oplog and the
// change collection entries.
const startTimestamp = assert
.commandWorked(primary.getDB("test").runCommand(
{insert: "seedCollection", documents: [{_id: "beginTs"}]}))
.operationTime;
// Pause the checkpointing, as such non-journaled collection including the change collection will
// not be persisted.
const pauseCheckpointThreadFailPoint = configureFailPoint(primary, "pauseCheckpointThread");
pauseCheckpointThreadFailPoint.wait();
// Insert a document to the collection.
assert.commandWorked(primary.getDB("test").stockPrice.insert({_id: "mdb", price: 250}));
// Verify that the inserted document can be queried from the 'stockPrice', the 'oplog.rs', and
// the 'system.change_collection'.
assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1);
assert.eq(primary.getDB("local")
.oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
.toArray()
.length,
1);
assert.eq(primary.getDB("config")
.system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
.toArray()
.length,
1);
// Perform ungraceful shutdown of the primary node and do not clean the db path directory.
replSetTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true});
// Run a new mongoD instance with db path pointing to the replica set primary db directory.
const standalone =
MongoRunner.runMongod({dbpath: primary.dbpath, noReplSet: true, noCleanData: true});
assert.neq(null, standalone, "Fail to restart the node as standalone");
// Verify that the inserted document does not exist both in the 'stockPrice' and
// the 'system.change_collection' but exists in the 'oplog.rs'.
assert.eq(standalone.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 0);
assert.eq(standalone.getDB("local")
.oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
.toArray()
.length,
1);
assert.eq(standalone.getDB("config")
.system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
.toArray()
.length,
0);
// Stop the mongoD instance and do not clean the db directory.
MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: true, wait: true});
// Start the replica set primary with the same db path.
replSetTest.start(primary, {
noCleanData: true,
setParameter: {
featureFlagServerlessChangeStreams: true,
multitenancySupport: true,
featureFlagMongoStore: true
}
});
primary = replSetTest.getPrimary();
// Verify that the 'stockPrice' and the 'system.change_collection' now have the inserted document.
// This document was inserted by applying oplog entries during the startup recovery.
assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1);
assert.eq(primary.getDB("config")
.system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
.toArray()
.length,
1);
// Get the oplog timestamp up to this point. All oplog entries upto this timestamp must exist in the
// change collection.
const endTimestamp = primary.getDB("local").oplog.rs.find().toArray().at(-1).ts;
assert(endTimestamp !== undefined);
// Verify that the oplog and the change collection entries between the ['startTimestamp',
// 'endTimestamp'] window are exactly same and in the same order.
// TODO SERVER-69115 Pass the tenant id to the 'verifyChangeCollectionEntries'.
verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp);
replSetTest.stopSet();
})();
|