summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/lookup_with_limit_sharded.js
blob: 19011f6d8073fbb5e51501bbfb3664a16e1bd5c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
 * Tests that the $limit stage is pushed before $lookup stages, except when there is an $unwind.
 * This will be run against a sharded cluster, which invalidates the disablePipelineOptimization
 * failpoints that the standalone 'lookup_with_limit' tests use.
 *
 * For an unsharded collection, the result of 'explain()' is matched against the expected order of
 * stages. For a sharded collection, the 'getAggPlanStages()' function is used to
 * check whether $limit was reordered.
 *
 * @tags: [
 *   requires_replication,
 *   requires_sharding,
 * ]
 */
(function() {
load("jstests/libs/analyze_plan.js");  // For getAggPlanStages().
load("jstests/libs/sbe_util.js");      // For checkSBEEnabled.

const st = new ShardingTest({shards: 2, config: 1});
const db = st.s.getDB("test");

// TODO SERVER-64614 Update test cases for the SBE $lookup and remove 'if' block.
if (checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) {
    jsTestLog("Skipping test because SBE and SBE $lookup features are both enabled.");
    st.stop();
    return;
}

const coll = db.lookup_with_limit;
const other = db.lookup_with_limit_other;
coll.drop();
other.drop();

// Checks that the order of the pipeline stages matches the expected optimized ordering for an
// unsharded collection.
function checkUnshardedResults(pipeline, expectedPlanStage, expectedPipeline) {
    const explain = coll.explain().aggregate(pipeline);
    assert.eq(
        getWinningPlan(explain.stages[0].$cursor.queryPlanner).stage, expectedPlanStage, explain);
    for (let i = 0; i < expectedPipeline.length; i++) {
        assert.eq(Object.keys(explain.stages[i + 1]), expectedPipeline[i], explain);
    }
}

// Checks that the expected stages are pushed down to the query system for a sharded collection.
function checkShardedResults(pipeline, expected) {
    const limitStages = getAggPlanStages(coll.explain().aggregate(pipeline), "LIMIT");
    assert.eq(limitStages.length, expected, limitStages);
}

// Insert ten documents into coll: {x: 0}, {x: 1}, ..., {x: 9}.
const bulk = coll.initializeOrderedBulkOp();
Array.from({length: 10}, (_, i) => ({x: i})).forEach(doc => bulk.insert(doc));
assert.commandWorked(bulk.execute());

// Insert twenty documents into other: {x: 0, y: 0}, {x: 0, y: 1}, ..., {x: 9, y: 0}, {x: 9, y: 1}.
const bulk_other = other.initializeOrderedBulkOp();
Array.from({length: 10}, (_, i) => ({x: i, y: 0})).forEach(doc => bulk_other.insert(doc));
Array.from({length: 10}, (_, i) => ({x: i, y: 1})).forEach(doc => bulk_other.insert(doc));
assert.commandWorked(bulk_other.execute());

// Tests on an unsharded collection.

// Check that lookup->limit is reordered to limit->lookup, with the limit stage pushed down to query
// system.
const lookupPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$limit: 5}
];
checkUnshardedResults(lookupPipeline, "LIMIT", ["$lookup"]);

// Check that lookup->addFields->lookup->limit is reordered to limit->lookup->addFields->lookup,
// with the limit stage pushed down to query system.
const multiLookupPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$addFields: {z: 0}},
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "additional"}},
    {$limit: 5}
];
checkUnshardedResults(multiLookupPipeline, "LIMIT", ["$lookup", "$addFields", "$lookup"]);

// Check that lookup->unwind->limit is reordered to lookup->limit, with the unwind stage being
// absorbed into the lookup stage and preventing the limit from swapping before it.
const unwindPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$unwind: "$from_other"},
    {$limit: 5}
];
checkUnshardedResults(unwindPipeline, "COLLSCAN", ["$lookup", "$limit"]);

// Check that lookup->unwind->sort->limit is reordered to lookup->sort, with the unwind stage being
// absorbed into the lookup stage and preventing the limit from swapping before it, and the limit
// stage being absorbed into the sort stage.
const sortPipeline = [
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$unwind: "$from_other"},
    {$sort: {x: 1}},
    {$limit: 5}
];
checkUnshardedResults(sortPipeline, "COLLSCAN", ["$lookup", "$sort"]);

// Check that sort->lookup->limit is reordered to sort->lookup, with the limit stage being absorbed
// into the sort stage and creating a top-k sort, which is pushed down to query system.
const topKSortPipeline = [
    {$sort: {x: 1}},
    {$lookup: {from: other.getName(), localField: "x", foreignField: "x", as: "from_other"}},
    {$limit: 5}
];
checkUnshardedResults(topKSortPipeline, "SORT", ["$lookup"]);
const explain = coll.explain().aggregate(topKSortPipeline);
assert.eq(getWinningPlan(explain.stages[0].$cursor.queryPlanner).limitAmount, 5, explain);

// Tests on a sharded collection.
coll.createIndex({x: 1});
st.shardColl(coll, {x: 1}, {x: 1}, {x: 1}, db, true);

checkShardedResults(lookupPipeline, 2);
checkShardedResults(multiLookupPipeline, 2);
checkShardedResults(unwindPipeline, 0);
checkShardedResults(sortPipeline, 0);
checkShardedResults(topKSortPipeline, 2);

st.stop();
}());