1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
/**
* Tests the behavior of the $_internalBoundedSort stage.
* @tags: [
* # TODO SERVER-52286 should be requires_fcv_60
* requires_fcv_53,
* # Cannot insert into a time-series collection in a multi-document transaction.
* does_not_support_transactions,
* # Refusing to run a test that issues an aggregation command with explain because it may return
* # incomplete results if interrupted by a stepdown.
* does_not_support_stepdowns,
* ]
*/
(function() {
"use strict";
load('jstests/libs/analyze_plan.js');
load("jstests/core/timeseries/libs/timeseries.js");
if (!TimeseriesTest.bucketUnpackWithSortEnabled(db.getMongo())) {
jsTestLog("Skipping test because 'BucketUnpackWithSort' is disabled.");
return;
}
const coll = db.timeseries_internal_bounded_sort;
const buckets = db['system.buckets.' + coll.getName()];
coll.drop();
assert.commandWorked(
db.createCollection(coll.getName(), {timeseries: {timeField: 't', metaField: 'm'}}));
const bucketMaxSpanSeconds =
db.getCollectionInfos({name: coll.getName()})[0].options.timeseries.bucketMaxSpanSeconds;
// Insert some data.
{
const numBatches = 10;
const batchSize = 1000;
const start = new Date();
const intervalMillis = 1000; // 1 second
for (let i = 0; i < numBatches; ++i) {
const batch = Array.from(
{length: batchSize},
(_, j) =>
({t: new Date(+start + i * batchSize * intervalMillis + j * intervalMillis)}));
assert.commandWorked(coll.insert(batch));
print(`Inserted ${i + 1} of ${numBatches} batches`);
}
assert.gt(buckets.aggregate([{$count: 'n'}]).next().n, 1, 'Expected more than one bucket');
}
// Create an index: we'll need this to scan the buckets in time order.
// TODO SERVER-60824 use the $natural / _id index instead.
assert.commandWorked(coll.createIndex({t: 1}));
const unpackStage = getAggPlanStage(coll.explain().aggregate(), '$_internalUnpackBucket');
function assertSorted(result) {
let prev = {t: -Infinity};
for (const doc of result) {
assert.lte(+prev.t, +doc.t, 'Found two docs not in time order: ' + tojson({prev, doc}));
prev = doc;
}
}
// Test sorting the whole collection.
{
const naive = buckets
.aggregate([
unpackStage,
{$_internalInhibitOptimization: {}},
{$sort: {t: 1}},
])
.toArray();
assertSorted(naive);
const opt = buckets
.aggregate([
{$sort: {'control.min.t': 1}},
unpackStage,
{
$_internalBoundedSort: {
sortKey: {t: 1},
bound: bucketMaxSpanSeconds,
}
},
])
.toArray();
assertSorted(opt);
assert.eq(naive, opt);
}
// Test $sort + $limit.
{
const naive = buckets
.aggregate([
unpackStage,
{$_internalInhibitOptimization: {}},
{$sort: {t: 1}},
{$limit: 100},
])
.toArray();
assertSorted(naive);
assert.eq(100, naive.length);
const opt = buckets
.aggregate([
{$sort: {'control.min.t': 1}},
unpackStage,
{
$_internalBoundedSort: {
sortKey: {t: 1},
bound: bucketMaxSpanSeconds,
}
},
{$limit: 100},
])
.toArray();
assertSorted(opt);
assert.eq(100, opt.length);
assert.eq(naive, opt);
}
})();
|