summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/spill_to_disk_secondary_read.js
blob: dedffcbf5d12ef3b5285794c849194cede5a554d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*
 * Test that $group and $setWindowFields spill to the WT RecordStore on secondaries with
 * writeConcern greater than w:1.
 * @tags: [requires_replication, requires_majority_read_concern]
 */
(function() {
"use strict";

load("jstests/libs/sbe_explain_helpers.js");  // For getSbePlanStages.
load("jstests/libs/sbe_util.js");             // For checkSBEEnabled.

const kNumNodes = 3;
const replTest = new ReplSetTest({
    nodes: kNumNodes,
});

replTest.startSet();
replTest.initiate();

/**
 * Setup the primary and secondary collections.
 */
let primary = replTest.getPrimary();
let bulk = primary.getDB("test").foo.initializeUnorderedBulkOp();
const cRecords = 50;
for (let i = 0; i < cRecords; ++i) {
    // We'll be using a unique 'key' field for group & lookup, but we cannot use '_id' for this,
    // because '_id' is indexed and would trigger Indexed Loop Join instead of Hash Join.
    bulk.insert({key: i, string: "test test test"});
}
assert.commandWorked(bulk.execute({w: kNumNodes, wtimeout: 5000}));

let secondary = replTest.getSecondary();
// Wait for the insertion to be visible on 'secondary'.
replTest.awaitLastOpCommitted(null, [secondary]);
const readColl = secondary.getDB("test").foo;

/**
 * Test spilling of $group, when explicitly run on a secondary.
 */
(function testGroupSpilling() {
    if (!checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBEGroupPushdown"])) {
        jsTestLog("Skipping test for HashAgg stage: $group lowering into SBE isn't enabled");
        return;
    }

    // Set memory limit so low that HashAgg has to spill all records it processes.
    const oldSetting =
        assert
            .commandWorked(secondary.adminCommand({
                setParameter: 1,
                internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: 1
            }))
            .was;
    try {
        // The pipeline is silly -- because '$key' contains unique values, it will "group" exactly
        // one record into each bucket and push a single '$string' value -- but it allows us to be
        // more deterministic with spilling behaviour: each input record would create a record
        // inside 'HashAgg' and because of the low memory limit, all of them will have to be
        // spilled. For the spilled bytes, sanity test that the number is "reasonably large".
        const pipeline = [{$group: {_id: '$key', s: {$push: '$string'}}}];
        const expectedSpilledRecords = cRecords;
        const expectedSpilledBytesAtLeast = cRecords * 10;

        // Sanity check that the operation fails if cannot use disk and is successful otherwise.
        const aggCommand = {
            pipeline: pipeline,
            allowDiskUse: false,
            readConcern: {level: "majority"},
            writeConcern: {"w": "majority"},
            cursor: {}
        };
        assert.commandFailedWithCode(readColl.runCommand("aggregate", aggCommand),
                                     ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed);
        let aggOptions = {
            allowDiskUse: true,
            readConcern: {level: "majority"},
            writeConcern: {"w": "majority"}
        };
        const res = readColl.aggregate(pipeline, aggOptions).toArray();
        assert.eq(res.length, cRecords);  // the group-by key is unique

        // In SBE also check the statistics for disk usage. Note: 'explain()' doesn't support the
        // 'writeConcern' option so we test spilling on the secondary but without using the concern.
        const explainRes =
            readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});
        const hashAggGroups = getSbePlanStages(explainRes, 'group');
        assert.eq(hashAggGroups.length, 1, explainRes);
        const hashAggGroup = hashAggGroups[0];
        assert(hashAggGroup, explainRes);
        assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup);
        assert(hashAggGroup.usedDisk, hashAggGroup);
        assert.eq(hashAggGroup.spilledRecords, expectedSpilledRecords, hashAggGroup);
        // We expect each record to be individually spilled, so the number of spill events and the
        // number of spilled records should be equal.
        assert.eq(hashAggGroup.numSpills, hashAggGroup.spilledRecords, hashAggGroup);
        assert.gt(hashAggGroup.spilledDataStorageSize, expectedSpilledBytesAtLeast, hashAggGroup);
    } finally {
        assert.commandWorked(secondary.adminCommand({
            setParameter: 1,
            internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: oldSetting
        }));
    }
})();

/**
 * Test spilling of $lookup when explicitly run on a secondary.
 */
(function testLookupSpillingInSbe() {
    if (!checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBELookupPushdown"])) {
        jsTestLog("Skipping test for HashLookup stage: $lookup lowering into SBE isn't enabled");
        return;
    }

    // Set memory limit so low that HashLookup has to spill all records it processes.
    const oldSetting =
        assert
            .commandWorked(secondary.adminCommand({
                setParameter: 1,
                internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1
            }))
            .was;
    try {
        // The pipeline is silly -- because '$key' contains unique values, it will self-join each
        // record with itself and nothing else -- but it allows us to be more deterministic with
        // the spilling behaviour. For the spilled bytes, sanity test that the number is "reasonably
        // large".
        const pipeline = [{
            $lookup:
                {from: readColl.getName(), localField: "key", foreignField: "key", as: "results"}
        }];
        const expectedSpilledRecordsAtLeast = cRecords;
        const expectedSpilledBytesAtLeast = cRecords * 20;

        // Sanity check that the operation is successful. Note: we cannot test the operation to fail
        // with 'allowDiskUse' set to "false" because that would block HashJoin and fall back to NLJ
        // which doesn't spill.
        let aggOptions = {
            allowDiskUse: true,
            readConcern: {level: "majority"},
            writeConcern: {"w": "majority"}
        };
        const res = readColl.aggregate(pipeline, aggOptions).toArray();
        assert.eq(res.length, cRecords);  // the key for self-join is unique

        // In SBE also check the statistics for disk usage. Note: 'explain()' doesn't support the
        // 'writeConcern' option so we test spilling on the secondary but without using the concern.
        const explainRes =
            readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});

        const hLookups = getSbePlanStages(explainRes, 'hash_lookup');
        assert.eq(hLookups.length, 1, explainRes);
        const hLookup = hLookups[0];
        assert(hLookup, explainRes);
        assert(hLookup.hasOwnProperty("usedDisk"), hLookup);
        assert(hLookup.usedDisk, hLookup);
        assert.gte(hLookup.spilledRecords, expectedSpilledRecordsAtLeast, hLookup);
        assert.gte(hLookup.spilledBytesApprox, expectedSpilledBytesAtLeast, hLookup);
    } finally {
        assert.commandWorked(secondary.adminCommand({
            setParameter: 1,
            internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: oldSetting
        }));
    }
})();

/**
 * Test spilling of $setWindowFields. We only check that the operation is successful. Main tests for
 * $setWindowFields can be found in jstests/aggregation/sources/setWindowFields/spill_to_disk.js.
 */
(function testSetWindowFields() {
    // Test that spilling '$setWindowFields' pipeline on a secondary works with a writeConcern
    // greater than w:1.
    let avgDocSize = 274;
    let smallPartitionSize = 6;
    let largePartitionSize = 21;
    const insertCollWFs = primary.getDB("test").bar;

    // Create small partition.
    for (let i = 0; i < smallPartitionSize; i++) {
        assert.commandWorked(insertCollWFs.insert({_id: i, val: i, partition: 1}));
    }
    // Create large partition.
    for (let i = 0; i < largePartitionSize; i++) {
        assert.commandWorked(
            insertCollWFs.insert({_id: i + smallPartitionSize, val: i, partition: 2}));
    }

    assert.commandWorked(secondary.adminCommand({
        setParameter: 1,
        internalDocumentSourceSetWindowFieldsMaxMemoryBytes: largePartitionSize * avgDocSize + 1
    }));

    const readCollWFs = secondary.getDB("test").bar;

    let pipeline = [
        {
            $setWindowFields: {
                partitionBy: "$partition",
                sortBy: {partition: 1},
                output: {arr: {$push: "$val", window: {documents: [-25, 25]}}}
            }
        },
        {$sort: {_id: 1}}
    ];

    let res = readCollWFs
                  .aggregate(pipeline, {
                      allowDiskUse: true,
                      readConcern: {level: "majority"},
                      writeConcern: {"w": "majority"}
                  })
                  .toArray();
})();

replTest.stopSet();
})();