summaryrefslogtreecommitdiff
path: root/jstests/serverless/change_collection_server_stats.js
blob: a5992259d786831e4f43d6bf4a71502e68b27c32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
 * Tests that FTDC collects information about the change collection, including its purging job.
 * @tags: [ requires_fcv_62 ]
 */
(function() {
'use strict';

load("jstests/libs/fail_point_util.js");
load('jstests/libs/ftdc.js');

const kExpiredChangeRemovalJobSleepSeconds = 5;
const kExpireAfterSeconds = 1;

const rst = new ReplSetTest({nodes: 1});
rst.startSet({
    setParameter: {
        featureFlagServerlessChangeStreams: 1,
        changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds,
    }
});
rst.initiate();

const primary = rst.getPrimary();
const adminDb = primary.getDB('admin');
const testDb = primary.getDB(jsTestName());
const changeCollection = primary.getDB("config").system.change_collection;

// Enable change streams to ensure the creation of change collections if run in serverless mode.
assert.commandWorked(adminDb.runCommand({setChangeStreamState: 1, enabled: true}));
assert.soon(() => {
    // Ensure that server status diagnostics is collecting change collection statistics.
    const serverStatusDiagnostics = verifyGetDiagnosticData(adminDb).serverStatus;
    return serverStatusDiagnostics.hasOwnProperty('changeCollections') &&
        serverStatusDiagnostics.changeCollections.hasOwnProperty('purgingJob');
});
const diagnosticsBeforeTestCollInsertions =
    verifyGetDiagnosticData(adminDb).serverStatus.changeCollections.purgingJob;

// Create collection and insert sample data.
assert.commandWorked(testDb.createCollection("testColl"));
const numberOfDocuments = 1000;
for (let i = 0; i < numberOfDocuments; i++) {
    assert.commandWorked(testDb.testColl.insert({x: i}));
}
const wallTimeOfTheFirstOplogEntry =
    new NumberLong(changeCollection.find().sort({wall: 1}).limit(1).next().wall.getTime());
const estimatedToBeRemovedDocsSize = changeCollection.find()
                                         .sort({wall: -1})
                                         .skip(1)
                                         .toArray()
                                         .map(doc => Object.bsonsize(doc))
                                         .reduce((acc, size) => acc + size, 0);
assert.gt(estimatedToBeRemovedDocsSize, 0);

// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
assert.commandWorked(adminDb.runCommand(
    {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));

// Ensure purging job deletes the expired oplog entries about insertion into test collection.
assert.soon(() => {
    // All change collection entries are removed but one.
    return changeCollection.count() === 1;
});

// Ensure that FTDC collected the purging job information of the change collection.
assert.soon(() => {
    const diagnosticsAfterTestCollInsertions =
        verifyGetDiagnosticData(adminDb).serverStatus.changeCollections.purgingJob;

    return diagnosticsAfterTestCollInsertions.totalPass >
        diagnosticsBeforeTestCollInsertions.totalPass &&
        diagnosticsAfterTestCollInsertions.scannedCollections >
        diagnosticsBeforeTestCollInsertions.scannedCollections &&
        diagnosticsAfterTestCollInsertions.bytesDeleted >
        diagnosticsBeforeTestCollInsertions.bytesDeleted + estimatedToBeRemovedDocsSize &&
        diagnosticsAfterTestCollInsertions.docsDeleted >
        diagnosticsBeforeTestCollInsertions.docsDeleted + numberOfDocuments - 1 &&
        diagnosticsAfterTestCollInsertions.maxStartWallTimeMillis.tojson() >=
        wallTimeOfTheFirstOplogEntry.tojson() &&
        diagnosticsAfterTestCollInsertions.timeElapsedMillis >
        diagnosticsBeforeTestCollInsertions.timeElapsedMillis;
});

rst.stopSet();
}());