summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/sbe/stages/spool.h
blob: 995b10076a066ee5ed7e20a6a16b5171d5a7a9ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
/**
 *    Copyright (C) 2020-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/stages/stages.h"

namespace mongo::sbe {
/**
 * This is a Spool PlanStage which retains a copy of all data it reads from its child in a shared
 * 'SpoolBuffer', and can later return this data without having to call its child to produce it
 * again.
 *
 * This spool operates in an 'Eager' producer mode. On the call to 'open()' it will read and store
 * the entire input from its child into the buffer. On the 'getNext' call it will return data from
 * the buffer.
 *
 * This producer spool can be connected with multiple consumer spools via a shared 'SpoolBuffer'.
 * This stage will be responsible for populating the buffer, while consumers will read from the
 * buffer once its populated, each using its own read pointer.
 */
class SpoolEagerProducerStage final : public PlanStage {
public:
    SpoolEagerProducerStage(std::unique_ptr<PlanStage> input,
                            SpoolId spoolId,
                            value::SlotVector vals,
                            PlanNodeId planNodeId);

    std::unique_ptr<PlanStage> clone() const final;

    void prepare(CompileCtx& ctx) final;
    value::SlotAccessor* getAccessor(CompileCtx& ctx, value::SlotId slot) final;
    void open(bool reOpen) final;
    PlanState getNext() final;
    void close() final;

    std::unique_ptr<PlanStageStats> getStats(bool includeDebugInfo) const final;
    const SpecificStats* getSpecificStats() const final;
    std::vector<DebugPrinter::Block> debugPrint() const final;

private:
    std::shared_ptr<SpoolBuffer> _buffer{nullptr};
    size_t _bufferIt{0};
    const SpoolId _spoolId;

    const value::SlotVector _vals;
    std::vector<value::SlotAccessor*> _inAccessors;
    value::SlotMap<value::MaterializedRowAccessor<SpoolBuffer>> _outAccessors;
};

/**
 * This is a Spool PlanStage which retains a copy of all data it reads from its child in a shared
 * 'SpoolBuffer', and can later return this data without having to call its child to produce it
 * again.
 *
 * This spool operates in a 'Lazy' producer mode. In contrast to the 'Eager' producer spool, on the
 * call to 'open()' it will _not_ read and populate the buffer. Instead, on the call to 'getNext'
 * it will read and store the input into the buffer, and immediately return it to the caller stage.
 *
 * This producer spool can be connected with multiple consumer spools via a shared 'SpoolBuffer'.
 * This stage will be responsible for populating the buffer in a lazy fashion as described above,
 * while consumers will read from the buffer (possibly while it's still being populated), each using
 * its own read pointer.
 *
 * This spool can be parameterized with an optional predicate which can be used to filter the input
 * and store only portion of input data into the buffer. Filtered out input data is passed through
 * without being stored into the buffer.
 */
class SpoolLazyProducerStage final : public PlanStage {
public:
    SpoolLazyProducerStage(std::unique_ptr<PlanStage> input,
                           SpoolId spoolId,
                           value::SlotVector vals,
                           std::unique_ptr<EExpression> predicate,
                           PlanNodeId planNodeId);

    std::unique_ptr<PlanStage> clone() const final;

    void prepare(CompileCtx& ctx) final;
    value::SlotAccessor* getAccessor(CompileCtx& ctx, value::SlotId slot) final;
    void open(bool reOpen) final;
    PlanState getNext() final;
    void close() final;

    std::unique_ptr<PlanStageStats> getStats(bool includeDebugInfo) const final;
    const SpecificStats* getSpecificStats() const final;
    std::vector<DebugPrinter::Block> debugPrint() const final;

private:
    std::shared_ptr<SpoolBuffer> _buffer{nullptr};
    const SpoolId _spoolId;

    const value::SlotVector _vals;
    std::vector<value::SlotAccessor*> _inAccessors;
    value::SlotMap<value::ViewOfValueAccessor> _outAccessors;

    std::unique_ptr<EExpression> _predicate;
    std::unique_ptr<vm::CodeFragment> _predicateCode;
    vm::ByteCode _bytecode;
    bool _compiled{false};
};

/**
 * This is Spool PlanStage which operates in read-only mode. It doesn't populate its 'SpoolBuffer'
 * with the input data (and as such, it doesn't have an input PlanStage) but reads and returns data
 * from a shared 'SpoolBuffer' that is populated by another producer spool stage.
 *
 * This consumer PlanStage can operate as a Stack Spool, in conjunction with a 'Lazy' producer
 * spool. In this mode the consumer spool on each call to 'getNext' first deletes the input from
 * buffer, remembered on the previous call to 'getNext', and then moves the read pointer to the last
 * element in the buffer and returns it.
 *
 * Since in 'Stack' mode this spool always returns the last input from the buffer, it does not read
 * data in the same order as they were added. It will always return the last added input. For
 * example, the lazy spool can add values [1,2,3], then the stack consumer spool will read and
 * delete 3, then another two values can be added to the buffer [1,2,4,5], then the consumer spool
 * will read and delete 5, and so on.
 */
template <bool IsStack>
class SpoolConsumerStage final : public PlanStage {
public:
    SpoolConsumerStage(SpoolId spoolId, value::SlotVector vals, PlanNodeId planNodeId)
        : PlanStage{IsStack ? "sspool"_sd : "cspool"_sd, planNodeId},
          _spoolId{spoolId},
          _vals{std::move(vals)} {}

    std::unique_ptr<PlanStage> clone() const {
        return std::make_unique<SpoolConsumerStage<IsStack>>(_spoolId, _vals, _commonStats.nodeId);
    }

    void prepare(CompileCtx& ctx) {
        if (!_buffer) {
            _buffer = ctx.getSpoolBuffer(_spoolId);
        }

        value::SlotSet dupCheck;
        size_t counter = 0;

        for (auto slot : _vals) {
            auto [it, inserted] = dupCheck.insert(slot);
            uassert(4822809, str::stream() << "duplicate field: " << slot, inserted);

            _outAccessors.emplace(
                slot, value::MaterializedRowAccessor<SpoolBuffer>{*_buffer, _bufferIt, counter++});
        }
    }

    value::SlotAccessor* getAccessor(CompileCtx& ctx, value::SlotId slot) {
        if (auto it = _outAccessors.find(slot); it != _outAccessors.end()) {
            return &it->second;
        }

        return ctx.getAccessor(slot);
    }

    void open(bool reOpen) {
        auto optTimer(getOptTimer(_opCtx));

        _commonStats.opens++;
        _bufferIt = _buffer->size();
    }

    PlanState getNext() {
        auto optTimer(getOptTimer(_opCtx));

        if constexpr (IsStack) {
            if (_bufferIt != _buffer->size()) {
                _buffer->erase(_buffer->begin() + _bufferIt);
            }

            if (_buffer->size() == 0) {
                return trackPlanState(PlanState::IS_EOF);
            }

            _bufferIt = _buffer->size() - 1;
        } else {
            if (_bufferIt == _buffer->size()) {
                _bufferIt = 0;
            } else {
                ++_bufferIt;
            }

            if (_bufferIt == _buffer->size()) {
                return trackPlanState(PlanState::IS_EOF);
            }
        }
        return trackPlanState(PlanState::ADVANCED);
    }

    void close() {
        auto optTimer(getOptTimer(_opCtx));

        _commonStats.closes++;
    }

    std::unique_ptr<PlanStageStats> getStats(bool includeDebugInfo) const {
        auto ret = std::make_unique<PlanStageStats>(_commonStats);

        if (includeDebugInfo) {
            BSONObjBuilder bob;
            bob.appendIntOrLL("spoolId", _spoolId);
            bob.append("outputSlots", _vals);
            ret->debugInfo = bob.obj();
        }

        return ret;
    }

    const SpecificStats* getSpecificStats() const {
        return nullptr;
    }

    std::vector<DebugPrinter::Block> debugPrint() const {
        auto ret = PlanStage::debugPrint();

        DebugPrinter::addSpoolIdentifier(ret, _spoolId);

        ret.emplace_back(DebugPrinter::Block("[`"));
        for (size_t idx = 0; idx < _vals.size(); ++idx) {
            if (idx) {
                ret.emplace_back(DebugPrinter::Block("`,"));
            }

            DebugPrinter::addIdentifier(ret, _vals[idx]);
        }
        ret.emplace_back("`]");

        return ret;
    }

private:
    std::shared_ptr<SpoolBuffer> _buffer{nullptr};
    size_t _bufferIt{0};
    const SpoolId _spoolId;

    const value::SlotVector _vals;
    value::SlotMap<value::MaterializedRowAccessor<SpoolBuffer>> _outAccessors;
};
}  // namespace mongo::sbe