summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_densify.h
blob: b6a2ddf35281b59c761239e42204fd40f3ec8a48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
/**
 *    Copyright (C) 2021-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/exec/document_value/value_comparator.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_densify_gen.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/memory_usage_tracker.h"
#include "mongo/db/query/datetime/date_time_support.h"
#include "mongo/util/overloaded_visitor.h"
#include "mongo/util/time_support.h"


namespace mongo {

class RangeStatement;
class DensifyValue {
public:
    // Delegate to the zero-argument constructor for stdx::variant<T>. This constructor is needed
    // for DensifyValue to be able to be the value type in a ValueUnorderedMap.
    DensifyValue() : _value() {}
    DensifyValue(Value val) : _value(val) {}
    DensifyValue(Date_t date) : _value(date) {}

    TimeZone timezone() const {
        return TimeZoneDatabase::utcZone();
    }

    /**
     * Convert a DensifyValue into a Value for use in documents/serialization.
     */
    Value toValue() const {
        return stdx::visit(OverloadedVisitor{[&](Value unwrappedVal) { return unwrappedVal; },
                                             [&](Date_t dateVal) {
                                                 return Value(dateVal);
                                             }

                           },
                           _value);
    }

    /**
     * Compare two DensifyValues using the standard comparator convention, returning the sign
     * of (lhs - rhs). Returns -1 if lhs < rhs, 0 if lhs == rhs, and 1 if lhs > rhs.
     */
    static int compare(const DensifyValue& lhs, const DensifyValue& rhs) {
        return stdx::visit(OverloadedVisitor{[&](Value lhsVal) {
                                                 Value rhsVal = stdx::get<Value>(rhs._value);
                                                 return Value::compare(lhsVal, rhsVal, nullptr);
                                             },
                                             [&](Date_t lhsVal) {
                                                 Date_t rhsVal = stdx::get<Date_t>(rhs._value);
                                                 return Value::compare(
                                                     Value(lhsVal), Value(rhsVal), nullptr);
                                             }},
                           lhs._value);
    }

    /**
     * Get the value to be densified from a document. The function checks the type to ensure that
     * the document has either a numeric value or a date in the proper field, and throws an error
     * otherwise.
     */
    static DensifyValue getFromDocument(const Document& doc, const FieldPath& path) {
        Value val = doc.getNestedField(path);
        uassert(5733201,
                "Densify field type must be numeric or a date",
                val.numeric() || val.getType() == BSONType::Date);
        if (!val.numeric()) {
            return val.getDate();
        }
        return val;
    }

    std::string toString() const {
        return stdx::visit(OverloadedVisitor{[&](Value v) { return v.toString(); },
                                             [&](Date_t d) {
                                                 return d.toString();
                                             }},
                           _value);
    }

    /**
     * Returns a new DensifyValue incremented by the step in the provided range.
     */
    DensifyValue increment(const RangeStatement& range) const;

    /**
     * Returns a new DensifyValue decremented by the step in the provided range.
     */
    DensifyValue decrement(const RangeStatement& range) const;

    /**
     * Delegate to Value::getApproximateSize().
     */
    size_t getApproximateSize() const {
        return stdx::visit(OverloadedVisitor{[&](Value v) { return v.getApproximateSize(); },
                                             [&](Date_t d) {
                                                 return Value(d).getApproximateSize();
                                             }},
                           _value);
    }

    /**
     * Returns true if this DensifyValue is a date.
     */
    bool isDate() const {
        return stdx::holds_alternative<Date_t>(_value);
    }

    /**
     * Returns the DensifyValue as a date.
     */
    Date_t getDate() const {
        tassert(5733701, "DensifyValue must be a date", isDate());
        return stdx::get<Date_t>(_value);
    }

    /**
     * Returns true if this DensifyValue is a number.
     */
    bool isNumber() const {
        return stdx::holds_alternative<Value>(_value);
    }

    /**
     * Returns the DensifyValue as a number in a Value.
     */
    Value getNumber() const {
        tassert(5733702, "DensifyValue must be a number", isNumber());
        return stdx::get<Value>(_value);
    }

    /**
     * Returns true if this DensifyValue and the other one are both the same underlying type.
     */
    bool isSameTypeAs(const DensifyValue& other) const {
        return (this->isDate() && other.isDate()) || (this->isNumber() && other.isNumber());
    }

    /**
     * Checks if the value is exactly on the step defined in the given RangeStatement
     * relative to the provided base value.
     */
    bool isOnStepRelativeTo(DensifyValue base, RangeStatement range) const;

    /**
     * Comparison operator overloads.
     */
    bool operator==(const DensifyValue& rhs) const {
        return compare(*this, rhs) == 0;
    }

    bool operator!=(const DensifyValue& rhs) const {
        return !(*this == rhs);
    }

    bool operator<(const DensifyValue& rhs) const {
        return compare(*this, rhs) < 0;
    }

    bool operator>(const DensifyValue& rhs) const {
        return compare(*this, rhs) > 0;
    }

    bool operator>=(const DensifyValue& rhs) const {
        return !(*this < rhs);
    }

    bool operator<=(const DensifyValue& rhs) const {
        return !(*this > rhs);
    }

    /**
     * Stream pipe operator for debug purposes.
     */
    friend std::ostream& operator<<(std::ostream& os, const DensifyValue& val) {
        os << val.toString();
        return os;
    }

private:
    stdx::variant<Value, Date_t> _value;
};
class RangeStatement {
public:
    static constexpr StringData kArgUnit = "unit"_sd;
    static constexpr StringData kArgBounds = "bounds"_sd;
    static constexpr StringData kArgStep = "step"_sd;

    static constexpr StringData kValFull = "full"_sd;
    static constexpr StringData kValPartition = "partition"_sd;

    struct Full {};
    struct Partition {};
    typedef std::pair<DensifyValue, DensifyValue> ExplicitBounds;
    using Bounds = stdx::variant<Full, Partition, ExplicitBounds>;

    Bounds getBounds() const {
        return _bounds;
    }

    boost::optional<TimeUnit> getUnit() const {
        return _unit;
    }

    Value getStep() const {
        return _step;
    }

    RangeStatement(Value step, Bounds bounds, boost::optional<TimeUnit> unit)
        : _step(step), _bounds(bounds), _unit(unit) {}

    static RangeStatement parse(RangeSpec spec);

    Value serialize(SerializationOptions opts) const {
        MutableDocument spec;
        spec[kArgStep] = opts.serializeLiteralValue(_step);
        spec[kArgBounds] = stdx::visit(
            OverloadedVisitor{[&](Full) { return Value(kValFull); },
                              [&](Partition) { return Value(kValPartition); },
                              [&](ExplicitBounds bounds) {
                                  return Value(std::vector<Value>(
                                      {opts.serializeLiteralValue(bounds.first.toValue()),
                                       opts.serializeLiteralValue(bounds.second.toValue())}));
                              }},
            _bounds);
        if (_unit)
            spec[kArgUnit] = opts.serializeLiteralValue(serializeTimeUnit(*_unit));
        return spec.freezeToValue();
    }

private:
    Value _step;
    Bounds _bounds;
    boost::optional<TimeUnit> _unit = boost::none;
};

namespace document_source_densify {
constexpr StringData kStageName = "$densify"_sd;

/**
 * The 'internal' parameter specifies whether or not we create a sort stage that is required for
 * correct execution of an _internalDensify stage.
 */
std::list<boost::intrusive_ptr<DocumentSource>> createFromBsonInternal(
    BSONElement elem,
    const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
    StringData stageName,
    bool isInternal);
std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
    BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);

/**
 * The 'internal' parameter specifies whether or not we create a sort stage that is required for
 * correct execution of an _internalDensify stage.
 */
std::list<boost::intrusive_ptr<DocumentSource>> create(
    const boost::intrusive_ptr<ExpressionContext>& expCtx,
    std::list<FieldPath> partitions,
    FieldPath field,
    RangeStatement rangeStatement,
    bool isInternal);
}  // namespace document_source_densify

class DocumentSourceInternalDensify final : public DocumentSource {
public:
    static constexpr StringData kStageName = "$_internalDensify"_sd;
    static constexpr StringData kPartitionByFieldsFieldName = "partitionByFields"_sd;
    static constexpr StringData kFieldFieldName = "field"_sd;
    static constexpr StringData kRangeFieldName = "range"_sd;

    DocumentSourceInternalDensify(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
                                  const FieldPath& field,
                                  const std::list<FieldPath>& partitions,
                                  const RangeStatement& range)
        : DocumentSource(kStageName, pExpCtx),
          _field(std::move(field)),
          _partitions(std::move(partitions)),
          _range(std::move(range)),
          _partitionTable(pExpCtx->getValueComparator().makeUnorderedValueMap<DensifyValue>()),
          _memTracker(
              MemoryUsageTracker(false, internalDocumentSourceDensifyMaxMemoryBytes.load())) {
        _maxDocs = internalQueryMaxAllowedDensifyDocs.load();
    };

    class DocGenerator {
    public:
        DocGenerator(DensifyValue current,
                     RangeStatement range,
                     FieldPath fieldName,
                     boost::optional<Document> includeFields,
                     boost::optional<Document> finalDoc,
                     ValueComparator comp,
                     size_t* counter);
        Document getNextDocument();
        bool done() const;


    private:
        ValueComparator _comp;
        RangeStatement _range;
        // The field to add to 'includeFields' to generate a document.
        FieldPath _path;
        Document _includeFields;
        // The document that is equal to or larger than the upper bound that prompted the
        // creation of this generator. Will be returned after the final generated document. Can
        // be boost::none if we are generating the values at the end of the range.
        boost::optional<Document> _finalDoc;
        // The minimum value that this generator will create, therefore the next generated
        // document will have this value.
        DensifyValue _min;

        enum class GeneratorState {
            // Generating documents between '_min' and the upper bound.
            kGeneratingDocuments,
            // Generated all necessary documents, waiting for a final 'getNextDocument()' call.
            kReturningFinalDocument,
            kDone,
        };

        GeneratorState _state = GeneratorState::kGeneratingDocuments;
        // Value to increment when returning a generated document. This is a pointer to the counter
        // that keeps track of the total number of documents generated by the owning stage across
        // all generators.
        size_t* _counter;
    };

    DocumentSourceInternalDensify(
        const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
        double step,
        FieldPath path,
        boost::optional<std::pair<DensifyValue, DensifyValue>> range = boost::none);

    static boost::intrusive_ptr<DocumentSourceInternalDensify> create(
        const boost::intrusive_ptr<ExpressionContext>& pExpCtx);

    static boost::intrusive_ptr<DocumentSource> createFromBson(
        BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);

    StageConstraints constraints(Pipeline::SplitState pipeState) const final {
        return {StreamType::kStreaming,
                PositionRequirement::kNone,
                HostTypeRequirement::kNone,
                DiskUseRequirement::kNoDiskUse,
                FacetRequirement::kAllowed,
                TransactionRequirement::kAllowed,
                LookupRequirement::kAllowed,
                UnionRequirement::kAllowed};
    }

    const char* getSourceName() const final {
        return kStageName.rawData();
    }

    Value serialize(SerializationOptions opts = SerializationOptions()) const final override;

    DepsTracker::State getDependencies(DepsTracker* deps) const final {
        deps->fields.insert(_field.fullPath());
        // We don't need to traverse _partitionExpr because it was generated from _partitions.
        // Every ExpressionFieldPath it contains is already covered by _partitions.
        for (const auto& field : _partitions) {
            deps->fields.insert(field.fullPath());
        }
        return DepsTracker::State::SEE_NEXT;
    }

    void addVariableRefs(std::set<Variables::Id>* refs) const final {
        // The partition expression cannot refer to any variables because it is internally generated
        // based on a set of field paths.
    }

    boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
        return DistributedPlanLogic{nullptr, this, boost::none};
    }

    GetNextResult doGetNext() final;

protected:
    Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
                                                     Pipeline::SourceContainer* container) final;

private:
    enum class ValComparedToRange {
        kBelow,
        kRangeMin,
        kInside,
        kAbove,
    };

    DensifyValue getDensifyValue(const Document& doc) {
        auto val = DensifyValue::getFromDocument(doc, _field);
        uassert(6053600,
                val.isNumber()
                    ? "Encountered numeric densify value in collection when step has a date unit."
                    : "Encountered date densify value in collection when step does not have a date "
                      "unit.",
                (!_range.getUnit() && val.isNumber()) || (_range.getUnit() && val.isDate()));
        return val;
    }

    Value getDensifyPartition(const Document& doc) {
        auto part = _partitionExpr->evaluate(doc, &pExpCtx->variables);
        return part;
    }

    /**
     * Decides whether or not to build a DocGen and return the first document generated or return
     * the current doc if the rangeMin + step is greater than rangeMax. Used for both 'full' and
     * 'partition' bounds.
     */
    DocumentSource::GetNextResult handleNeedGen(Document currentDoc);

    /**
     * Checks where the current doc's value lies compared to the range and creates the correct
     * DocGen if needed and returns the next doc.
     */
    DocumentSource::GetNextResult handleNeedGenExplicit(Document currentDoc);

    /**
     * Takes care of when an EOF has been hit for the explicit case. It checks if we have finished
     * densifying over the range, and if so changes the state to be kDensify done. Otherwise it
     * builds a new generator that will finish densifying over the range and changes the state to
     * kHaveGen. Only used if the input is not partitioned.
     */
    DocumentSource::GetNextResult densifyExplicitRangeAfterEOF();

    /**
     * Decide what to do for the first document in a given partition for explicit range. Either
     * generate documents between the minimum and the value, or just return it.
     */
    DocumentSource::GetNextResult processFirstDocForExplicitRange(Document doc);

    /**
     * Creates a document generator based on the value passed in, the current _current, and the
     * ExplicitBounds on the stage. Once created, the state changes to kHaveGenerator and the first
     * document from the generator is returned.
     */
    DocumentSource::GetNextResult processDocAboveExplicitMinBound(Document doc);

    /**
     * Takes in a value and checks if the value is below, on the bottom, inside, or above the
     * range, and returns the equivelant state from ValComparedToRange.
     */
    ValComparedToRange getPositionRelativeToRange(DensifyValue val);

    /**
     * Handles when the pSource has been exhausted. In the full case we are done with the
     * densification process and the state becomes kDensifyDone, however in the explicit case we
     * may still need to densify over the remainder of the range, so the
     * densifyExplicitRangeAfterEOF() function is called.
     */
    DocumentSource::GetNextResult handleSourceExhausted();

    /**
     * Handles building a document generator once we've seen an EOF for partitioned input. Min will
     * be the last seen value in the partition unless it is less than the optional 'minOverride'.
     * Helper is to share code between visit functions.
     */
    DocumentSource::GetNextResult finishDensifyingPartitionedInput();
    DocumentSource::GetNextResult finishDensifyingPartitionedInputHelper(
        DensifyValue max, boost::optional<DensifyValue> minOverride = boost::none);

    /**
     * Checks if the current document generator is done. If it is and we have finished densifying,
     * it changes the state to be kDensifyDone. If there is more to densify, the state becomes
     * kNeedGen. The generator is also deleted.
     */
    void resetDocGen(RangeStatement::ExplicitBounds bounds);

    /**
     * Set up the state for densifying over partitions.
     */
    void initializePartitionState(Document initialDoc);

    /**
     * Helper to set the value in the partition table.
     */
    void setPartitionValue(Document doc) {
        if (_partitionExpr) {
            auto partitionKey = getDensifyPartition(doc);
            auto partitionVal = getDensifyValue(doc);
            auto lastValForPartitionIt = _partitionTable.find(partitionKey);
            if (lastValForPartitionIt == _partitionTable.end()) {
                // If this is a new partition, store the size of the key and the value.
                _memTracker.update(partitionKey.getApproximateSize() +
                                   partitionVal.getApproximateSize());
            } else {
                // Subtract the size of the previous value and add the new one.
                _memTracker.update(partitionVal.getApproximateSize() -
                                   lastValForPartitionIt->second.getApproximateSize());
            }
            uassert(6007200,
                    str::stream() << "$densify exceeded memory limit of "
                                  << _memTracker._maxAllowedMemoryUsageBytes,
                    _memTracker.withinMemoryLimit());

            _partitionTable[partitionKey] = partitionVal;
        }
    }

    /**
     * Helpers to create doc generators. Sets _docGenerator to the created generator.
     */
    void createDocGenerator(DensifyValue min,
                            RangeStatement range,
                            boost::optional<Document> includeFields,
                            boost::optional<Document> finalDoc) {
        _docGenerator = DocGenerator(min,
                                     range,
                                     _field,
                                     includeFields,
                                     finalDoc,
                                     pExpCtx->getValueComparator(),
                                     &_docsGenerated);
    }
    void createDocGenerator(DensifyValue min, RangeStatement range) {
        createDocGenerator(min, range, boost::none, boost::none);
    }


    Pipeline::SourceContainer::iterator combineSorts(Pipeline::SourceContainer::iterator itr,
                                                     Pipeline::SourceContainer* container);

    boost::optional<DocGenerator> _docGenerator = boost::none;

    /**
     * The last value seen or generated by the stage that is also in line with the step.
     */
    boost::optional<DensifyValue> _current = boost::none;

    // Used to keep track of the bounds for densification in the full case.
    boost::optional<DensifyValue> _globalMin = boost::none;
    boost::optional<DensifyValue> _globalMax = boost::none;

    // _partitionExpr has two purposes:
    // 1. to determine which partition a document belongs in.
    // 2. to initialize new documents with the right partition key.
    // For example, if the stage had 'partitionByFields: ["a", "x.y"]' then this expression
    // would be {a: "$a", {x: {y: "$x.y"}}}.
    boost::intrusive_ptr<ExpressionObject> _partitionExpr;

    bool _eof = false;

    enum class DensifyState {
        kUninitializedOrBelowRange,
        kNeedGen,
        kHaveGenerator,
        kFinishingDensify,
        kDensifyDone
    };

    DensifyState _densifyState = DensifyState::kUninitializedOrBelowRange;
    FieldPath _field;
    std::list<FieldPath> _partitions;
    RangeStatement _range;
    // Store of the value we've seen for each partition.
    ValueUnorderedMap<DensifyValue> _partitionTable;

    // Keep track of documents generated, error if it goes above the limit.
    size_t _docsGenerated = 0;
    size_t _maxDocs = 0;
    MemoryUsageTracker _memTracker;
};
}  // namespace mongo