summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js
blob: 2baf8294674e259b392c65244bb51f79ba044aee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
 * Evaluate the behaviour of bucket closure when we simulate high cache pressure due to a high
 * cardinality workload. After we hit a certain cardinality (the number of active buckets generated
 * in this test by distinct metaField values) we expect buckets to be closed with a smaller bucket
 * size limit to alleviate pressure on the cache.
 *
 * @tags: [
 *   # Exclude in-memory engine, rollbacks due to pinned cache content rely on eviction.
 *   requires_persistence,
 *   requires_replication,
 *   requires_wiredtiger,
 * ]
 */
(function() {
"use strict";

load("jstests/core/timeseries/libs/timeseries.js");

const minWiredTigerCacheSizeGB = 0.256;
const cacheSize = minWiredTigerCacheSizeGB * 1000 * 1000 * 1000;  // 256 MB
const defaultBucketMaxSize = 128000;                              // 125 KB
const minBucketCount = 10;
const timeFieldName = 'time';
const metaFieldName = 'meta';
const timestamp = ISODate('2023-02-13T01:00:00Z');

// A cardinality higher than this calculated value will call for smaller bucket size limit caused by
// cache pressure.
const cardinalityForCachePressure = Math.ceil(cacheSize / (2 * defaultBucketMaxSize));  // 1000

const replSet = new ReplSetTest({
    nodes: 1,
    nodeOptions: {wiredTigerCacheSizeGB: minWiredTigerCacheSizeGB},
});
replSet.startSet({setParameter: {timeseriesBucketMaxSize: defaultBucketMaxSize}});
replSet.initiate();

const db = replSet.getPrimary().getDB(jsTestName());
const coll = db.getCollection('t');
coll.drop();
assert.commandWorked(db.createCollection(
    coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));

if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) {
    replSet.stopSet();
    jsTestLog(
        'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.');
    return;
}

// Helper to log timeseries stats.
const formatStatsLog = ((stats) => {
    return "Timeseries stats: " + tojson(stats);
});

// Inserts documents into the collection with increasing meta fields to generate N buckets. We make
// sure to exceed the bucket min count per bucket to bypass large measurement checks.
const initializeBucketsPastMinCount = function(numOfBuckets = 1) {
    jsTestLog("Inserting and generating buckets. Targeting '" + numOfBuckets + "' buckets.");
    let bulk = coll.initializeUnorderedBulkOp();
    for (let i = 0; i < numOfBuckets; i++) {
        for (let j = 0; j < minBucketCount; ++j) {
            const doc = {
                _id: '' + i + j,
                [timeFieldName]: timestamp,
                [metaFieldName]: i,
                value: "a".repeat(1000)
            };
            bulk.insert(doc);
        }
    }
    assert.commandWorked(bulk.execute());
};

const belowCardinalityThreshold = cardinalityForCachePressure;
initializeBucketsPastMinCount(belowCardinalityThreshold);

let timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
let bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize;
let bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure;
let compressedBuckets = timeseriesStats.numCompressedBuckets;

// Ensure we have not closed any buckets due to size or cache pressure.
assert.eq(bucketsClosedDueToSize, 0, formatStatsLog(timeseriesStats));
assert.eq(bucketsClosedDueToCachePressure, 0, formatStatsLog(timeseriesStats));
assert.eq(timeseriesStats.bucketCount, belowCardinalityThreshold, formatStatsLog(timeseriesStats));

// We only end up doing two passes before we start to close buckets due to size limits.
while (bucketsClosedDueToSize == 0) {
    jsTestLog("Inserting 50000 bytes of data into buckets.");
    let bulk = coll.initializeUnorderedBulkOp();
    for (let i = 0; i < belowCardinalityThreshold; i++) {
        bulk.insert({
            _id: '00' + i,
            [timeFieldName]: timestamp,
            [metaFieldName]: i,
            value: "a".repeat(30000)
        });
        bulk.insert({
            _id: '00' + i,
            [timeFieldName]: timestamp,
            [metaFieldName]: i,
            value: "a".repeat(20000)
        });
    }
    assert.commandWorked(bulk.execute());

    timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
    bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize;
    bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure;
    compressedBuckets = timeseriesStats.numCompressedBuckets;
}

// On the second pass of inserts, we will close buckets due to the default size constraints. No
// buckets should be closed due to cache pressure.
assert.eq(bucketsClosedDueToSize, cardinalityForCachePressure, formatStatsLog(timeseriesStats));
assert.eq(bucketsClosedDueToCachePressure, 0, formatStatsLog(timeseriesStats));
assert.eq(compressedBuckets, cardinalityForCachePressure, formatStatsLog(timeseriesStats));

// If we pass the cardinality point to simulate cache pressure, we will begin to see buckets closed
// due to 'CachePressure' and not 'DueToSize'.
const aboveCardinalityThreshold = cardinalityForCachePressure * 3 / 2;
initializeBucketsPastMinCount(aboveCardinalityThreshold);

let bulk = coll.initializeUnorderedBulkOp();
for (let i = 0; i < aboveCardinalityThreshold; i++) {
    bulk.insert(
        {_id: '00' + i, [timeFieldName]: timestamp, [metaFieldName]: i, value: "a".repeat(20000)});
}
assert.commandWorked(bulk.execute());

timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize;
bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure;
compressedBuckets = timeseriesStats.numCompressedBuckets;

// We expect 'bucketsClosedDueToSize' to remain the same but 'bucketsClosedDueToCachePressure' to
// increase.
assert.eq(bucketsClosedDueToSize, cardinalityForCachePressure, formatStatsLog(timeseriesStats));

// Previously, the bucket max size was 128000 bytes, but under cache pressure using
// 'aboveCardinalityThreshold', the max size drops to roughly ~85334. This means the old
// measurements (up to 'cardinalityForCachePressure') will need to be closed since they are sized at
// ~120000 bytes. The newly inserted measurements are only sized at ~(20000 * 3) bytes so stay open.
assert.eq(
    bucketsClosedDueToCachePressure, cardinalityForCachePressure, formatStatsLog(timeseriesStats));

// We expect the number of compressed buckets to double (independent to whether the buckets were
// closed due to size or cache pressure).
assert.eq(compressedBuckets, 2 * cardinalityForCachePressure, formatStatsLog(timeseriesStats));

replSet.stopSet();
})();