1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
// Contains functions for testing the change collections.
// Verifies that the oplog and change collection entries are the same for the provided tenant
// 'tenantId' for the specified timestamp window:- (startOplogTimestamp, endOplogTimestamp].
function verifyChangeCollectionEntries(
connection, startOplogTimestamp, endOplogTimestamp, tenantId) {
// Fetch the oplog documents for the provided tenant for the specified timestamp window. Note
// that the startOplogTimestamp is expected to be just before the first write, while the
// endOplogTimestamp is expected to be the timestamp of the final write in the test.
const oplogColl = connection.getDB("local").oplog.rs;
const oplogEntries = oplogColl
.find({
$and: [
{ts: {$gt: startOplogTimestamp}},
{ts: {$lte: endOplogTimestamp}},
{tid: tenantId}
]
})
.toArray();
// Fetch all documents from the tenant's change collection for the specified timestamp window.
const changeColl =
ChangeStreamMultitenantReplicaSetTest.getTenantConnection(connection.host, tenantId)
.getDB("config")
.system.change_collection;
const changeCollectionEntries =
changeColl
.find({$and: [{_id: {$gt: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
.toArray();
// Verify that the number of documents returned by the oplog and the tenant's change collection
// are exactly the same.
assert.eq(oplogEntries.length,
changeCollectionEntries.length,
"Number of entries in the oplog and the change collection with tenantId: " +
tenantId + " is not the same. Oplog has total " + oplogEntries.length +
" entries , change collection has total " + changeCollectionEntries.length +
" entries, change collection entries " + tojson(changeCollectionEntries));
// Verify that the documents in the change collection are exactly the same as the oplog for a
// particular tenant.
for (let idx = 0; idx < oplogEntries.length; idx++) {
const oplogEntry = oplogEntries[idx];
const changeCollectionEntry = changeCollectionEntries[idx];
// Remove the '_id' field from the change collection as oplog does not have it.
assert(changeCollectionEntry.hasOwnProperty("_id"));
assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts),
0,
"Change collection with tenantId: " + tenantId +
" '_id' field: " + tojson(changeCollectionEntry._id) +
" is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts));
delete changeCollectionEntry["_id"];
// Verify that the oplog and change collecton entry (after removing the '_id') field are
// the same.
assert.eq(oplogEntry,
changeCollectionEntry,
"Oplog and change collection with tenantId: " + tenantId +
" entries are not same. Oplog entry: " + tojson(oplogEntry) +
", change collection entry: " + tojson(changeCollectionEntry));
}
}
// A class that sets up the multitenant environment to enable change collections on the replica set.
// This class also provides helpers that are commonly used when working with change collections.
class ChangeStreamMultitenantReplicaSetTest extends ReplSetTest {
constructor(config) {
// Instantiate the 'ReplSetTest'.
super(config);
// Start and initialize the replica set.
// TODO SERVER-67267 Add 'serverless' flag.
const setParameter = Object.assign({}, config.setParameter || {}, {
featureFlagServerlessChangeStreams: true,
multitenancySupport: true,
featureFlagMongoStore: true,
featureFlagRequireTenantID: true
});
this.startSet({setParameter: setParameter});
this.initiate();
// Create a root user within the multitenant environment to enable passing '$tenant' to
// commands.
assert.commandWorked(this.getPrimary().getDB("admin").runCommand(
{createUser: "root", pwd: "pwd", roles: ["root"]}));
}
// Returns a connection to the 'hostAddr' with 'tenantId' stamped to it for the created user.
static getTenantConnection(hostAddr, tenantId, createUser = {
user: ObjectId().str,
roles: [{role: 'readWriteAnyDatabase', db: 'admin'}]
}) {
const tokenConn = new Mongo(hostAddr);
// Login to the root user with 'ActionType::useTenant' such that the '$tenant' can be
// used.
assert(tokenConn.getDB("admin").auth("root", "pwd"));
// Create the user with the provided attributes.
assert.commandWorked(tokenConn.getDB("$external").runCommand({
createUser: createUser.user,
'$tenant': tenantId,
roles: createUser.roles
}));
// Set the provided tenant id into the security token for the user.
tokenConn._setSecurityToken(
_createSecurityToken({user: createUser.user, db: '$external', tenant: tenantId}));
// Logout the root user to avoid multiple authentication.
tokenConn.getDB("admin").logout();
return tokenConn;
}
// Sets the change stream state for the provided tenant connection.
setChangeStreamState(tenantConn, enabled) {
assert.commandWorked(
tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: enabled}));
}
// Returns the change stream state for the provided tenant connection.
getChangeStreamState(tenantConn) {
return assert.commandWorked(tenantConn.getDB("admin").runCommand({getChangeStreamState: 1}))
.enabled;
}
}
|