1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
/**
* Validate the state of time-series collections after inserting measurements into reopened buckets.
*
* We set the 'timeseriesIdleBucketExpiryMemoryUsageThreshold' to a low value and configure the
* 'alwaysUseSameBucketCatalogStripe' failpoint to expedite bucket closures and increase the number
* of buckets we reopen to insert into.
*
* @tags: [requires_replication]
*/
(function() {
"use strict";
load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest'.
load("jstests/libs/fail_point_util.js");
const rst = new ReplSetTest({nodes: 1});
rst.startSet({setParameter: {timeseriesIdleBucketExpiryMemoryUsageThreshold: 1024}});
rst.initiate();
const db = rst.getPrimary().getDB(jsTestName());
assert.commandWorked(db.dropDatabase());
if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) {
rst.stopSet();
jsTestLog(
'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.');
return;
}
const collNamePrefix = db.validate_timeseries_bucket_reopening_;
const timeFieldName = 'time';
const metaFieldName1 = 'meta';
const metaFieldName2 = 'tag';
const valueFieldName = 'value';
let testCaseId = 0;
const validateBucketReopening = function(metaFieldName = null) {
// Create collection with metaField passed in.
let timeseriesOptions = {timeField: timeFieldName};
if (metaFieldName != null) {
timeseriesOptions = Object.merge(timeseriesOptions, {metaField: metaFieldName});
}
jsTestLog("Running validateBucketReopening() with timeseriesOptions = " +
tojson(timeseriesOptions));
const coll = db.getCollection(collNamePrefix + testCaseId++);
const bucketsColl = db.getCollection('system.buckets.' + coll.getName());
coll.drop();
assert.commandWorked(db.createCollection(coll.getName(), {timeseries: timeseriesOptions}));
assert.contains(bucketsColl.getName(), db.getCollectionNames());
// Insert documents with varying metaField values (if the 'metaFieldName' is specified).
const distinctMetaValues = 10;
const numOfPasses = 100;
for (let i = 0; i < numOfPasses; ++i) {
for (let j = 0; j < distinctMetaValues; ++j) {
if (metaFieldName != null) {
assert.commandWorked(coll.insert(
{[timeFieldName]: ISODate(), [metaFieldName]: j, [valueFieldName]: "a"}));
} else {
assert.commandWorked(
coll.insert({[timeFieldName]: ISODate(), [valueFieldName]: "a"}));
}
}
}
// Verify we inserted measurements into reopened buckets through the time-series stats.
const stats = assert.commandWorked(coll.stats());
assert(stats.timeseries);
if (metaFieldName != null) {
// The idea of this test is to insert measurements into reopened buckets so we are looking
// to reopen more buckets than we create.
assert.lt(stats.timeseries['bucketCount'],
stats.timeseries['numBucketsReopened'],
"Timeseries stats: " + tojson(stats));
assert.lt(stats.timeseries['numBucketsOpenedDueToMetadata'],
stats.timeseries['numBucketsReopened'],
"Timeseries stats: " + tojson(stats));
// The number of bucket inserts should be less than the number of bucket updates.
assert.lt(stats.timeseries['numBucketInserts'],
stats.timeseries['numBucketUpdates'],
"Timeseries stats: " + tojson(stats));
} else {
// When no metaField is specified, all measurements fit in one bucket since there is no need
// to close buckets due to metadata.
assert.eq(stats.timeseries['bucketCount'], 1, "Timeseries stats: " + tojson(stats));
}
// Finally, validate the collection and ensure there are no inconsistencies.
const res = assert.commandWorked(coll.validate());
assert.eq(res.valid, true);
assert.eq(res.nNonCompliantDocuments, 0);
assert.eq(res.nInvalidDocuments, 0);
assert.eq(res.errors.length, 0);
assert.eq(res.warnings.length, 0);
};
// Activate failpoint to place all buckets in the same stripe in the BucketCatalog.
let fpSameStripe = configureFailPoint(db, "alwaysUseSameBucketCatalogStripe");
// Validate results with no metaField.
validateBucketReopening();
// Validate results with metaField == 'meta'.
validateBucketReopening(metaFieldName1);
// Validate results with metaField == 'tag'.
validateBucketReopening(metaFieldName2);
fpSameStripe.off();
rst.stopSet();
})();
|