summaryrefslogtreecommitdiff
path: root/jstests/serverless/libs/change_collection_util.js
blob: f9b8a9c68469dc1d48e077de54855692d010cd07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Contains functions for testing the change collections.

// Verifies that the oplog and change collection entries are the same for the provided tenant
// 'tenantId' for the specified timestamp window:- (startOplogTimestamp, endOplogTimestamp].
function verifyChangeCollectionEntries(
    connection, startOplogTimestamp, endOplogTimestamp, tenantId) {
    // Fetch the oplog documents for the provided tenant for the specified timestamp window. Note
    // that the startOplogTimestamp is expected to be just before the first write, while the
    // endOplogTimestamp is expected to be the timestamp of the final write in the test.
    const oplogColl = connection.getDB("local").oplog.rs;
    const oplogEntries = oplogColl
                             .find({
                                 $and: [
                                     {ts: {$gt: startOplogTimestamp}},
                                     {ts: {$lte: endOplogTimestamp}},
                                     {tid: tenantId}
                                 ]
                             })
                             .toArray();

    // Fetch all documents from the tenant's change collection for the specified timestamp window.
    const changeColl =
        ChangeStreamMultitenantReplicaSetTest.getTenantConnection(connection.host, tenantId)
            .getDB("config")
            .system.change_collection;
    const changeCollectionEntries =
        changeColl
            .find({$and: [{_id: {$gt: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
            .toArray();

    // Verify that the number of documents returned by the oplog and the tenant's change collection
    // are exactly the same.
    assert.eq(oplogEntries.length,
              changeCollectionEntries.length,
              "Number of entries in the oplog and the change collection with tenantId: " +
                  tenantId + " is not the same. Oplog has total " + oplogEntries.length +
                  " entries , change collection has total " + changeCollectionEntries.length +
                  " entries, change collection entries " + tojson(changeCollectionEntries));

    // Verify that the documents in the change collection are exactly the same as the oplog for a
    // particular tenant.
    for (let idx = 0; idx < oplogEntries.length; idx++) {
        const oplogEntry = oplogEntries[idx];
        const changeCollectionEntry = changeCollectionEntries[idx];

        // Remove the '_id' field from the change collection as oplog does not have it.
        assert(changeCollectionEntry.hasOwnProperty("_id"));
        assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts),
                  0,
                  "Change collection with tenantId: " + tenantId +
                      " '_id' field: " + tojson(changeCollectionEntry._id) +
                      " is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts));
        delete changeCollectionEntry["_id"];

        // Verify that the oplog and change collecton entry (after removing the '_id') field are
        // the same.
        assert.eq(oplogEntry,
                  changeCollectionEntry,
                  "Oplog and change collection with tenantId: " + tenantId +
                      " entries are not same. Oplog entry: " + tojson(oplogEntry) +
                      ", change collection entry: " + tojson(changeCollectionEntry));
    }
}

// A class that sets up the multitenant environment to enable change collections on the replica set.
// This class also provides helpers that are commonly used when working with change collections.
class ChangeStreamMultitenantReplicaSetTest extends ReplSetTest {
    constructor(config) {
        // Instantiate the 'ReplSetTest'.
        super(config);

        // Start and initialize the replica set.
        // TODO SERVER-67267 Add 'serverless' flag.
        const setParameter = Object.assign({}, config.setParameter || {}, {
            featureFlagServerlessChangeStreams: true,
            multitenancySupport: true,
            featureFlagMongoStore: true,
            featureFlagRequireTenantID: true
        });
        this.startSet({setParameter: setParameter});
        this.initiate();

        // Create a root user within the multitenant environment to enable passing '$tenant' to
        // commands.
        assert.commandWorked(this.getPrimary().getDB("admin").runCommand(
            {createUser: "root", pwd: "pwd", roles: ["root"]}));
    }

    // Returns a connection to the 'hostAddr' with 'tenantId' stamped to it for the created user.
    static getTenantConnection(hostAddr, tenantId, createUser = {
        user: ObjectId().str,
        roles: [{role: 'readWriteAnyDatabase', db: 'admin'}]
    }) {
        const tokenConn = new Mongo(hostAddr);

        // Login to the root user with 'ActionType::useTenant' such that the '$tenant' can be
        // used.
        assert(tokenConn.getDB("admin").auth("root", "pwd"));

        // Create the user with the provided attributes.
        assert.commandWorked(tokenConn.getDB("$external").runCommand({
            createUser: createUser.user,
            '$tenant': tenantId,
            roles: createUser.roles
        }));

        // Set the provided tenant id into the security token for the user.
        tokenConn._setSecurityToken(
            _createSecurityToken({user: createUser.user, db: '$external', tenant: tenantId}));

        // Logout the root user to avoid multiple authentication.
        tokenConn.getDB("admin").logout();

        return tokenConn;
    }

    // Sets the change stream state for the provided tenant connection.
    setChangeStreamState(tenantConn, enabled) {
        assert.commandWorked(
            tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: enabled}));
    }

    // Returns the change stream state for the provided tenant connection.
    getChangeStreamState(tenantConn) {
        return assert.commandWorked(tenantConn.getDB("admin").runCommand({getChangeStreamState: 1}))
            .enabled;
    }
}