summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/lookup_with_limit_sharded.js
blob: 6846db7f0f8729039689692b216dba4cfab76a5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
 * Tests that the $limit stage is pushed before $lookup stages, except when there is an $unwind.
 * This will be run against a sharded cluster, which invalidates the disablePipelineOptimization
 * failpoints that the standalone 'lookup_with_limit' tests use.
 *
 * For an unsharded collection, the result of 'explain()' is matched against the expected order of
 * stages. For a sharded collection, the 'getAggPlanStages()' function is used to
 * check whether $limit was reordered.
 *
 * @tags: [
 *   requires_replication,
 *   requires_sharding,
 * ]
 */
(function() {
load("jstests/libs/analyze_plan.js");  // For getAggPlanStages().
load("jstests/libs/sbe_util.js");      // For checkSBEEnabled.

const st = new ShardingTest({shards: 2, config: 1});
const db = st.s.getDB("test");

if (!checkSBEEnabled(db)) {
    jsTestLog("Skipping test because SBE $lookup is not enabled.");
    st.stop();
    return;
}

const coll = db.lookup_with_limit;
const other = db.lookup_with_limit_other;
coll.drop();
other.drop();

// Checks that the order of the query stages and pipeline stages matches the expected optimized
// ordering for an unsharded collection.
function checkUnshardedResults(pipeline, expectedPlanStages, expectedPipeline) {
    const explain = coll.explain().aggregate(pipeline);
    if (explain.stages) {
        const queryStages =
            flattenQueryPlanTree(getWinningPlan(explain.stages[0].$cursor.queryPlanner));
        const pipelineStages = explain.stages.slice(1).map(s => Object.keys(s)[0]);
        assert.eq(queryStages, expectedPlanStages, explain);
        assert.eq(pipelineStages, expectedPipeline, explain);
    } else {
        const queryStages = flattenQueryPlanTree(getWinningPlan(explain.queryPlanner));
        assert.eq(queryStages, expectedPlanStages, explain);
        assert.eq([], expectedPipeline, explain);
    }
}

// Checks that the expected stages are pushed down to the query system for a sharded collection.
function checkShardedResults(pipeline, expected) {
    const limitStages = getAggPlanStages(coll.explain().aggregate(pipeline), "LIMIT");
    assert.eq(limitStages.length, expected, limitStages);
}

// Insert ten documents into coll: {x: 0}, {x: 1}, ..., {x: 9}.
const bulk = coll.initializeOrderedBulkOp();
Array.from({length: 10}, (_, i) => ({x: i})).forEach(doc => bulk.insert(doc));
assert.commandWorked(bulk.execute());

// Insert twenty documents into other: {x: 0, y: 0}, {x: 0, y: 1}, ..., {x: 9, y: 0}, {x: 9, y: 1}.
const bulk_other = other.initializeOrderedBulkOp();
Array.from({length: 10}, (_, i) => ({x: i, y: 0})).forEach(doc => bulk_other.insert(doc));
Array.from({length: 10}, (_, i) => ({x: i, y: 1})).forEach(doc => bulk_other.insert(doc));
assert.commandWorked(bulk_other.execute());

// Tests on an unsharded collection.

// Check that lookup->limit is reordered to limit->lookup, with the limit stage pushed down to query
// system.
const lookupPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$limit: 5}
];
checkUnshardedResults(lookupPipeline, ["COLLSCAN", "LIMIT", "EQ_LOOKUP"], []);

// Check that lookup->addFields->lookup->limit is reordered to limit->lookup->addFields->lookup,
// with the limit stage pushed down to query system.
const multiLookupPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$addFields: {z: 0}},
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "additional"}},
    {$limit: 5}
];
checkUnshardedResults(
    multiLookupPipeline, ["COLLSCAN", "LIMIT", "EQ_LOOKUP"], ["$addFields", "$lookup"]);

// Check that lookup->unwind->limit is reordered to lookup->limit, with the unwind stage being
// absorbed into the lookup stage and preventing the limit from swapping before it.
const unwindPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$unwind: "$from_other"},
    {$limit: 5}
];
checkUnshardedResults(unwindPipeline, ["COLLSCAN"], ["$lookup", "$limit"]);

// Check that lookup->unwind->sort->limit is reordered to lookup->sort, with the unwind stage being
// absorbed into the lookup stage and preventing the limit from swapping before it, and the limit
// stage being absorbed into the sort stage.
const sortPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$unwind: "$from_other"},
    {$sort: {x: 1}},
    {$limit: 5}
];
checkUnshardedResults(sortPipeline, "COLLSCAN", ["$lookup", "$sort"]);

// Check that sort->lookup->limit is reordered to sort->lookup, with the limit stage being absorbed
// into the sort stage and creating a top-k sort, which is pushed down to query system.
const topKSortPipeline = [
    {$sort: {x: 1}},
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$limit: 5}
];
checkUnshardedResults(topKSortPipeline, ["COLLSCAN", "SORT", "EQ_LOOKUP"], []);
const explain = coll.explain().aggregate(topKSortPipeline);
assert.eq(getPlanStage(getWinningPlan(explain.queryPlanner), "SORT").limitAmount, 5, explain);

// Tests on a sharded collection.
coll.createIndex({x: 1});
st.shardColl(coll, {x: 1}, {x: 1}, {x: 1}, db, true);

checkShardedResults(lookupPipeline, 2);
checkShardedResults(multiLookupPipeline, 2);
checkShardedResults(unwindPipeline, 0);
checkShardedResults(sortPipeline, 0);
checkShardedResults(topKSortPipeline, 2);

st.stop();
}());