summaryrefslogtreecommitdiff
path: root/jstests/core/timeseries/bucket_unpacking_with_sort_plan_cache.js
blob: b42711b72f70cbc71a25a143cee56094f7e8ad82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
 * Test that the bucket unpacking with sorting rewrite is performed when plan is cached or
 * replanned.
 *
 * @tags: [
 *     # Plan cache stats doesn't support different read concerns.
 *     assumes_read_concern_unchanged,
 *     # Explain of a resolved view must be executed by mongos.
 *     directly_against_shardsvrs_incompatible,
 *     # This complicates aggregation extraction.
 *     do_not_wrap_aggregations_in_facets,
 *     # Refusing to run a test that issues an aggregation command with explain because it may
 *     # return incomplete results if interrupted by a stepdown.
 *     does_not_support_stepdowns,
 *     # We use the profiler to get info in order to force replanning.
 *     requires_profiling,
 *     # We need a timeseries collection.
 *     requires_timeseries,
 *     # Plan cache state is node-local and will not get migrated alongside tenant data.
 *     tenant_migration_incompatible,
 * ]
 */
(function() {
"use strict";

load("jstests/libs/profiler.js");                    // For getLatestProfileEntry.
load("jstests/libs/fixture_helpers.js");             // For FixtureHelpers.
load("jstests/libs/analyze_plan.js");                // For planHasStage.
load("jstests/core/timeseries/libs/timeseries.js");  // For TimeseriesTest.

if (!TimeseriesTest.bucketUnpackWithSortEnabled(db.getMongo())) {
    jsTestLog("Skipping test because 'BucketUnpackWithSort' is disabled.");
    return;
}

const fields = ["a", "b", "i"];

const addDocs = (coll, numDocs, constants = [1, 1]) => {
    let bulk = coll.initializeUnorderedBulkOp();

    assert.eq(fields.length, 3, fields);
    assert.eq(constants.length, 2, constants);

    // `m.a` & `m.b` will have value constants[i]. `m.i` & `t` will have value `i`.
    // `m.i` is to create separate buckets.
    for (let i = 0; i < numDocs; ++i) {
        let meta = {[fields[0]]: constants[0], [fields[1]]: constants[1], [fields[2]]: i};
        bulk.insert({m: meta, t: new Date(i)});
    }

    assert.commandWorked(bulk.execute());
};
const setupCollection = (coll, collName, numDocs) => {
    coll.drop();
    db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}});

    addDocs(coll, numDocs);
};

const collName = jsTestName();
const coll = db[collName];
const bucketsName = "system.buckets." + collName;
const stageName = "$_internalBoundedSort";
const bucketsColl = db[bucketsName];

const testBoundedSorterPlanCache = (sortDirection, indexDirection) => {
    // Setup with a few documents.
    const numDocs = 20;
    setupCollection(coll, collName, numDocs);

    assert.commandWorked(
        coll.createIndex({"m.a": indexDirection, "m.i": indexDirection, t: indexDirection}));
    assert.commandWorked(
        coll.createIndex({"m.b": indexDirection, "m.i": indexDirection, t: indexDirection}));

    // Check that the rewrite is performed before caching.
    const pipeline =
        [{$sort: {"m.i": sortDirection, t: sortDirection}}, {$match: {"m.a": 1, "m.b": 1}}];
    let explain = coll.explain().aggregate(pipeline);
    assert.eq(getAggPlanStages(explain, stageName).length, 1, explain);
    const traversalDirection = sortDirection === indexDirection ? "forward" : "backward";
    assert.eq(getAggPlanStage(explain, "IXSCAN").direction, traversalDirection, explain);

    // Check the cache is empty.
    assert.eq(db[bucketsName].getPlanCache().list().length, 0);

    // Run in order to cache the plan.
    let result = coll.aggregate(pipeline).toArray();
    assert.eq(result.length, 20, result);

    // Check the answer was cached.
    assert.eq(db[bucketsName].getPlanCache().list().length, 1);

    // Check that the solution still uses internal bounded sort with the correct order.
    explain = coll.explain().aggregate(pipeline);
    assert.eq(getAggPlanStages(explain, stageName).length, 1, explain);
    assert.eq(getAggPlanStage(explain, "IXSCAN").direction, traversalDirection, explain);

    // Get constants needed for replanning.
    const cursorStageName = "$cursor";
    const planCacheKey =
        getPlanCacheKeyFromExplain(getAggPlanStage(explain, cursorStageName)[cursorStageName], db);
    const planCacheEntry = (() => {
        const planCache = bucketsColl.getPlanCache().list([{$match: {planCacheKey}}]);
        assert.eq(planCache.length, 1, planCache);
        return planCache[0];
    })();
    let ratio = (() => {
        const getParamRes = assert.commandWorked(
            db.adminCommand({getParameter: 1, internalQueryCacheEvictionRatio: 1}));
        return getParamRes["internalQueryCacheEvictionRatio"];
    })();

    // Remove existing docs, add docs to trigger replanning.
    assert.commandWorked(coll.deleteMany({"m.a": 1, "m.b": 1}));
    let numNewDocs = ratio * planCacheEntry.works + 1;
    addDocs(coll, numNewDocs, [1, 0]);
    addDocs(coll, numNewDocs, [0, 1]);

    // Turn on profiling.
    db.setProfilingLevel(2);

    // Rerun command with replanning.
    const comment = jsTestName();
    result = coll.aggregate(pipeline, {comment}).toArray();
    assert.eq(result.length, 0);

    // Check that the plan was replanned.
    const replanProfileEntry = getLatestProfilerEntry(db, {'command.comment': comment});
    assert(replanProfileEntry.replanned, replanProfileEntry);

    // Check that rewrite happens with replanning.
    explain = coll.explain().aggregate(pipeline);
    assert.eq(getAggPlanStages(explain, stageName).length,
              1,
              {explain, stages: getAggPlanStages(explain, stageName)});
    assert.eq(getAggPlanStage(explain, "IXSCAN").direction, traversalDirection, explain);
};

for (const sortDirection of [-1, 1]) {
    for (const indexDirection of [-1, 1]) {
        testBoundedSorterPlanCache(sortDirection, indexDirection);
    }
}
})();