summaryrefslogtreecommitdiff
path: root/jstests/serverless/change_collection_expired_document_remover.js
blob: 8e04fed57624f028b576628b90ebc3d5ec747bb6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/**
 * Tests the change collection periodic remover job.
 *
 * @tags: [requires_fcv_62]
 */

(function() {
"use strict";

// For configureFailPoint.
load("jstests/libs/fail_point_util.js");
// For assertDropAndRecreateCollection.
load("jstests/libs/collection_drop_recreate.js");
// For ChangeStreamMultitenantReplicaSetTest.
load("jstests/serverless/libs/change_collection_util.js");
// For FeatureFlagUtil.
load("jstests/libs/feature_flag_util.js");

const getTenantConnection = ChangeStreamMultitenantReplicaSetTest.getTenantConnection;

// Sleep interval in seconds for the change collection remover job.
const kExpiredRemovalJobSleepSeconds = 5;
// Number of seconds after which the documents in change collections will be expired.
const kExpireAfterSeconds = 1;
// Number of seconds to sleep before inserting the next batch of documents in collections.
const kSleepBetweenWritesSeconds = 5;
// Millisecond(s) that can be added to the wall time to advance it marginally.
const kSafetyMarginMillis = 1;
// To imitate 1-by-1 deletion we specify a low amount of bytes per marker.
const kMinBytesPerMarker = 1;

const replSet = new ChangeStreamMultitenantReplicaSetTest({
    nodes: 2,
    nodeOptions: {
        setParameter: {
            changeCollectionTruncateMarkersMinBytes: kMinBytesPerMarker,
            changeCollectionExpiredDocumentsRemoverJobSleepSeconds: kExpiredRemovalJobSleepSeconds
        }
    }
});

const primary = replSet.getPrimary();
const secondary = replSet.getSecondary();

// Assert that the change collection contains all documents in 'expectedRetainedDocs' and no
// document in 'expectedDeletedDocs' for the collection 'stocksColl'.
function assertChangeCollectionDocuments(
    changeColl, stocksColl, expectedDeletedDocs, expectedRetainedDocs) {
    const collNss = `${stocksTestDb.getName()}.${stocksColl.getName()}`;
    const pipeline = (collectionEntries) => [{$match: {op: "i", ns: collNss}},
                                             {$replaceRoot: {"newRoot": "$o"}},
                                             {$match: {$or: collectionEntries}}];

    // Assert that querying for 'expectedRetainedDocs' yields documents that are exactly the same as
    // 'expectedRetainedDocs'.
    if (expectedRetainedDocs.length > 0) {
        assert.soonNoExcept(() => {
            const retainedDocs = changeColl.aggregate(pipeline(expectedRetainedDocs)).toArray();
            assert.eq(retainedDocs, expectedRetainedDocs);
            return true;
        });
    }

    // Assert that the query for any `expectedDeletedDocs` yields no results.
    if (expectedDeletedDocs.length > 0) {
        assert.soonNoExcept(() => {
            const deletedDocs = changeColl.aggregate(pipeline(expectedDeletedDocs)).toArray();
            assert.eq(deletedDocs.length, 0);
            return true;
        });
    }
}

// Returns the operation time for the provided document 'doc'.
function getDocumentOperationTime(doc) {
    const oplogEntry = primary.getDB("local").oplog.rs.findOne({o: doc});
    assert(oplogEntry);
    return oplogEntry.wall.getTime();
}

// Hard code a tenants information such that tenants can be identified deterministically.
const stocksTenantInfo = {
    tenantId: ObjectId("6303b6bb84305d2266d0b779"),
    user: "stock"
};
const citiesTenantInfo = {
    tenantId: ObjectId("7303b6bb84305d2266d0b779"),
    user: "cities"
};
const notUsedTenantInfo = {
    tenantId: ObjectId("8303b6bb84305d2266d0b779"),
    user: "notUser"
};

// Create connections to the primary such that they have respective tenant ids stamped.
const stocksTenantConnPrimary =
    getTenantConnection(primary.host, stocksTenantInfo.tenantId, stocksTenantInfo.user);
const citiesTenantConnPrimary =
    getTenantConnection(primary.host, citiesTenantInfo.tenantId, citiesTenantInfo.user);

// Create a tenant connection associated with 'notUsedTenantId' such that only the tenant id exists
// in the replica set but no corresponding change collection exists. The purging job should safely
// ignore this tenant without any side-effects.
const notUsedTenantConnPrimary =
    getTenantConnection(primary.host, notUsedTenantInfo.tenantId, notUsedTenantInfo.user);

// Create connections to the secondary such that they have respective tenant ids stamped.
const stocksTenantConnSecondary =
    getTenantConnection(secondary.host, stocksTenantInfo.tenantId, stocksTenantInfo.user);
const citiesTenantConnSecondary =
    getTenantConnection(secondary.host, citiesTenantInfo.tenantId, citiesTenantInfo.user);

// Enable change streams for both tenants.
replSet.setChangeStreamState(stocksTenantConnPrimary, true);
replSet.setChangeStreamState(citiesTenantConnPrimary, true);

// Verify change streams state for all tenants.
assert.eq(replSet.getChangeStreamState(stocksTenantConnPrimary), true);
assert.eq(replSet.getChangeStreamState(citiesTenantConnPrimary), true);
assert.eq(replSet.getChangeStreamState(notUsedTenantConnPrimary), false);

// Get tenants respective change collections on the primary.
const stocksChangeCollectionPrimary =
    stocksTenantConnPrimary.getDB("config").system.change_collection;
const citiesChangeCollectionPrimary =
    citiesTenantConnPrimary.getDB("config").system.change_collection;

// Get tenants respective change collections on the secondary.
const stocksChangeCollectionSecondary =
    stocksTenantConnSecondary.getDB("config").system.change_collection;
const citiesChangeCollectionSecondary =
    citiesTenantConnSecondary.getDB("config").system.change_collection;

// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
assert.commandWorked(stocksTenantConnPrimary.getDB("admin").runCommand(
    {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));
assert.commandWorked(citiesTenantConnPrimary.getDB("admin").runCommand(
    {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));

// Get tenants respective collections for testing.
const stocksTestDb = stocksTenantConnPrimary.getDB(jsTestName());
const citiesTestDb = citiesTenantConnPrimary.getDB(jsTestName());
const notUsedTestDb = notUsedTenantConnPrimary.getDB(jsTestName());

const stocksColl = assertDropAndRecreateCollection(stocksTestDb, "stocks");
const citiesColl = assertDropAndRecreateCollection(citiesTestDb, "cities");
const notUsedColl = assertDropAndRecreateCollection(notUsedTestDb, "notUsed");

// Wait until the remover job hangs.
let fpHangBeforeRemovingDocsPrimary =
    configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
let fpHangBeforeRemovingDocsSecondary =
    configureFailPoint(secondary, "hangBeforeRemovingExpiredChanges");
fpHangBeforeRemovingDocsPrimary.wait();
fpHangBeforeRemovingDocsSecondary.wait();

// Insert 5 documents to the 'stocks' collection owned by the 'stocksTenantId' that should be
// deleted.
const stocksExpiredDocuments = [
    {_id: "aapl", price: 140},
    {_id: "dis", price: 100},
    {_id: "nflx", price: 185},
    {_id: "baba", price: 66},
    {_id: "amc", price: 185}
];

// Insert 4 documents to the 'cities' collection owned by the 'citiesTenantId' that should be
// deleted.
const citiesExpiredDocuments = [
    {_id: "toronto", area_km2: 630},
    {_id: "singapore ", area_km2: 728},
    {_id: "london", area_km2: 1572},
    {_id: "tokyo", area_km2: 2194}
];

// Insert documents to the 'stocks' collection and wait for the replication.
assert.commandWorked(stocksColl.insertMany(stocksExpiredDocuments));
replSet.awaitReplication();

// Verify that the change collection for the 'stocks' tenant is consistent on both the primary and
// the secondary.
assertChangeCollectionDocuments(stocksChangeCollectionPrimary,
                                stocksColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ stocksExpiredDocuments);
assertChangeCollectionDocuments(stocksChangeCollectionSecondary,
                                stocksColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ stocksExpiredDocuments);

// Insert documents to the 'cities' collection and wait for the replication.
assert.commandWorked(citiesColl.insertMany(citiesExpiredDocuments));
replSet.awaitReplication();

// Verify that the change collection for the 'cities' tenant is consistent on both the primary and
// the secondary.
assertChangeCollectionDocuments(citiesChangeCollectionPrimary,
                                citiesColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ citiesExpiredDocuments);
assertChangeCollectionDocuments(citiesChangeCollectionSecondary,
                                citiesColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ citiesExpiredDocuments);

// Insert 2 documents to the 'notUsed' collection such that the associated tenant becomes visible to
// the mongoD. The documents in these collections will not be consumed by the change stream.
const notUsedDocuments =
    [{_id: "cricket_bat", since_years: 2}, {_id: "tennis_racket", since_years: 2}];
assert.commandWorked(notUsedColl.insertMany(notUsedDocuments));

// All document before and inclusive this wall time will be deleted by the purging job.
const lastExpiredDocumentTime = getDocumentOperationTime(citiesExpiredDocuments.at(-1));

// Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts
// has a sufficient delay in their wall time relative to the previous batch.
sleep(kSleepBetweenWritesSeconds * 1000);

// The documents for the 'stocks' collection owned by the 'stocksTenantId' that should not be
// deleted.
const stocksNonExpiredDocuments = [
    {_id: "wmt", price: 11},
    {_id: "coin", price: 23},
    {_id: "ddog", price: 15},
    {_id: "goog", price: 199},
    {_id: "tsla", price: 12}
];

// The documents for the 'cities' collection owned by the 'citiesTenantId' that should not be
// deleted.
const citiesNonExpiredDocuments = [
    {_id: "dublin", area_km2: 117},
    {_id: "new york", area_km2: 783},
    {_id: "hong kong", area_km2: 1114},
    {_id: "sydney", area_km2: 12386}
];

// Insert documents to the 'stocks' collection and wait for the replication.
assert.commandWorked(stocksColl.insertMany(stocksNonExpiredDocuments));
replSet.awaitReplication();

// Verify that state of change collection both at the primary and the secondary.
assertChangeCollectionDocuments(stocksChangeCollectionPrimary,
                                stocksColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ stocksNonExpiredDocuments);
assertChangeCollectionDocuments(stocksChangeCollectionSecondary,
                                stocksColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ stocksNonExpiredDocuments);

// Insert documents to the 'cities' collection and wait for the replication.
assert.commandWorked(citiesColl.insertMany(citiesNonExpiredDocuments));
replSet.awaitReplication();

// Verify that state of change collection both at the primary and the secondary.
assertChangeCollectionDocuments(citiesChangeCollectionPrimary,
                                citiesColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ citiesNonExpiredDocuments);
assertChangeCollectionDocuments(citiesChangeCollectionSecondary,
                                citiesColl,
                                /* expectedDeletedDocs */[],
                                /* expectedRetainedDocs */ citiesNonExpiredDocuments);

// Calculate the 'currentWallTime' such that only the first batch of inserted documents
// should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' <
// 'currentWallTime' < first-non-expired-document.
const currentWallTime =
    new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis);
const failpointName =
    FeatureFlagUtil.isPresentAndEnabled(stocksTestDb, "UseUnreplicatedTruncatesForDeletions")
    ? "injectCurrentWallTimeForCheckingMarkers"
    : "injectCurrentWallTimeForRemovingExpiredDocuments";
const fpInjectWallTimePrimary = configureFailPoint(primary, failpointName, {currentWallTime});
const fpInjectWallTimeSecondary = configureFailPoint(secondary, failpointName, {currentWallTime});

// Unblock the change collection remover job such that it picks up on the injected
// 'currentWallTime'.
fpHangBeforeRemovingDocsPrimary.off();
fpHangBeforeRemovingDocsSecondary.off();

// Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first
// failpoint.
fpInjectWallTimePrimary.wait();
fpInjectWallTimeSecondary.wait();

// Wait for a complete cycle of the TTL job.
fpHangBeforeRemovingDocsPrimary = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
fpHangBeforeRemovingDocsSecondary =
    configureFailPoint(secondary, "hangBeforeRemovingExpiredChanges");
fpHangBeforeRemovingDocsPrimary.wait();
fpHangBeforeRemovingDocsSecondary.wait();

// Assert that only required documents are retained in change collections on the primary.
assertChangeCollectionDocuments(
    stocksChangeCollectionPrimary, stocksColl, stocksExpiredDocuments, stocksNonExpiredDocuments);
assertChangeCollectionDocuments(
    citiesChangeCollectionPrimary, citiesColl, citiesExpiredDocuments, citiesNonExpiredDocuments);

// Wait for the replication to complete and assert that the expired documents have also been deleted
// from the secondary and the state is consistent with the primary.
replSet.awaitReplication();
assertChangeCollectionDocuments(
    stocksChangeCollectionSecondary, stocksColl, stocksExpiredDocuments, stocksNonExpiredDocuments);
assertChangeCollectionDocuments(
    citiesChangeCollectionSecondary, citiesColl, citiesExpiredDocuments, citiesNonExpiredDocuments);

fpHangBeforeRemovingDocsPrimary.off();
fpHangBeforeRemovingDocsSecondary.off();

replSet.stopSet();
})();