summaryrefslogtreecommitdiff
path: root/jstests/serverless/basic_write_to_change_collection.js
blob: e67b299fdace08708c26fc9383df57439a03b7c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Tests that entries are written to the change collection for collection create, drop and document
// modification operations.
// @tags: [
//   featureFlagMongoStore,
//   requires_fcv_62,
// ]
(function() {
"use strict";

// For verifyChangeCollectionEntries and ChangeStreamMultitenantReplicaSetTest.
load("jstests/serverless/libs/change_collection_util.js");
// For funWithArgs.
load('jstests/libs/parallel_shell_helpers.js');

// TODO SERVER-69115 Change to a 2-node replica set.
const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});

const primary = replSetTest.getPrimary();
const secondary = replSetTest.getSecondary();

const testDb = primary.getDB("test");

// Hard code tenants ids such that a particular tenant can be identified deterministically.
const firstTenantId = ObjectId("6303b6bb84305d2266d0b779");
const secondTenantId = ObjectId("7303b6bb84305d2266d0b779");

// Connections to the replica set primary that are stamped with their respective tenant ids.
const firstTenantConn =
    ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId);
const secondTenantConn =
    ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId);

// Enable the change stream state such that change collections are created for both tenants.
replSetTest.setChangeStreamState(firstTenantConn, true);
replSetTest.setChangeStreamState(secondTenantConn, true);

// Performs writes on the specified collection 'coll' such that the corresponding oplog entries are
// captured by the tenant's change collection.
function performWrites(coll, docIds) {
    docIds.forEach(docId => assert.commandWorked(coll.insert({_id: docId})));
    docIds.forEach(
        docId => assert.commandWorked(coll.update({_id: docId}, {$set: {annotate: "updated"}})));
}

// Retrieve the last timestamp from the oplog.
function getLatestTimestamp() {
    const oplogColl = primary.getDB("local").oplog.rs;
    const oplogTimestamp = oplogColl.find().sort({ts: -1}).limit(1).next().ts;
    assert(oplogTimestamp !== undefined);
    return oplogTimestamp;
}

// Test that writes to two different change collections are isolated and that each change collection
// captures only the relevant oplog entries associated with the corresponding tenant.
(function testWritesWithMultipleTenants() {
    jsTestLog("Testing writes on change collections with multiple tenants.");

    // A helper shell function to perform write for the specified 'tenantId'.
    function shellFn(hostAddr, collName, tenantId, performWrites) {
        load("jstests/serverless/libs/change_collection_util.js");

        const tenantConn =
            ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);

        const docIds = Array.from({length: 300}, (_, index) => index);
        performWrites(tenantConn.getDB("test").getCollection(collName), docIds);

        assert(tenantConn.getDB("test").getCollection(collName).drop());
    }

    const startOplogTimestamp = getLatestTimestamp();

    // Perform writes for the first tenant in a different shell.
    const firstTenantShellReturn =
        startParallelShell(funWithArgs(shellFn,
                                       primary.host,
                                       "testWritesWithMultipleTenants_firstTenant",
                                       firstTenantId,
                                       performWrites),
                           primary.port);

    // Perform writes to the second tenant parallely with the first tenant.
    const secondTenantShellReturn =
        startParallelShell(funWithArgs(shellFn,
                                       primary.host,
                                       "testWritesWithMultipleTenants_secondTenant",
                                       secondTenantId,
                                       performWrites),
                           primary.port);

    // Wait for both shells to return.
    firstTenantShellReturn();
    secondTenantShellReturn();

    const endOplogTimestamp = getLatestTimestamp();
    assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);

    // Verify that both change collections captured their respective tenant's oplog entries in
    // the primary.
    verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
    verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId);

    // TODO SERVER-69115 Uncomment this.
    /**
    //Wait for the replication to finish.
    replSetTest.awaitReplication();
    // Verify that both change collections captured their respective tenant's oplog entries in
    // the secondary.
    verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
    verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp,
    secondTenantId);
    */
})();

// Test that transactional writes to two different change collections are isolated and that each
// change collection captures only the relevant 'applyOps' oplog entries associated with the
// corresponding tenant.
(function testTransactionalWritesWithMultipleTenants() {
    jsTestLog("Testing transactional writes on change collections with multiple tenants.");

    // A helper shell function to perform transactional write for the specified 'tenantId'.
    function shellFn(hostAddr, collName, tenantId, performWrites) {
        load("jstests/serverless/libs/change_collection_util.js");

        const tenantConn =
            ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);

        const session = tenantConn.getDB("test").getMongo().startSession();
        const sessionDb = session.getDatabase("test");

        session.startTransaction();

        const docIds = Array.from({length: 300}, (_, index) => index);
        performWrites(sessionDb.getCollection(collName), docIds);

        session.commitTransaction_forTesting();
    }

    const startOplogTimestamp = getLatestTimestamp();

    // Perform writes within a transaction for the first tenant.
    const firstTenantShellReturn =
        startParallelShell(funWithArgs(shellFn,
                                       primary.host,
                                       "testTransactionalWritesWithMultipleTenants_firstTenant",
                                       firstTenantId,
                                       performWrites),
                           primary.port);

    // Perform parallel writes within a transaction for the second tenant.
    const secondTenantShellReturn =
        startParallelShell(funWithArgs(shellFn,
                                       primary.host,
                                       "testTransactionalWritesWithMultipleTenants_secondTenant",
                                       secondTenantId,
                                       performWrites),
                           primary.port);

    // Wait for shells to return.
    firstTenantShellReturn();
    secondTenantShellReturn();

    const endOplogTimestamp = getLatestTimestamp();
    assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);

    // Verify that both change collections captured their respective tenant's 'applyOps' oplog
    // entries in the primary.
    verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
    verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId);

    // TODO SERVER-69115 Uncomment this.
    /**
    // Wait for the replication to finish.
    replSetTest.awaitReplication();
    // Verify that both change collections captured their respective tenant's 'applyOps' oplog
    // entries in the secondary.
    verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
    verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp,
    secondTenantId);
    */
})();

replSetTest.stopSet();
}());