summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_densify.cpp
blob: f035cff11f7eb5262a3b848c45c312be6b97b870 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
/**
 *    Copyright (C) 2021-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#include "mongo/db/pipeline/document_source_densify.h"
#include "mongo/base/exact_cast.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/query/sort_pattern.h"
#include "mongo/stdx/variant.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/overloaded_visitor.h"

using boost::intrusive_ptr;
using boost::optional;
using std::list;
using SortPatternPart = mongo::SortPattern::SortPatternPart;
using ExplicitBounds = mongo::RangeStatement::ExplicitBounds;
using Full = mongo::RangeStatement::Full;
using Partition = mongo::RangeStatement::Partition;
using DensifyValue = mongo::DensifyValue;

namespace mongo {

RangeStatement RangeStatement::parse(RangeSpec spec) {
    Value step = spec.getStep();
    ValueComparator comp = ValueComparator();
    uassert(5733401,
            "The step parameter in a range statement must be a strictly positive numeric value",
            step.numeric() && comp.evaluate(step > Value(0)));

    optional<TimeUnit> unit = [&]() {
        if (auto unit = spec.getUnit()) {
            uassert(6586400,
                    "The step parameter in a range statement must be a whole number when "
                    "densifying a date range",
                    step.integral64Bit());
            return optional<TimeUnit>(parseTimeUnit(unit.value()));
        } else {
            return optional<TimeUnit>(boost::none);
        }
    }();

    Bounds bounds = [&]() {
        BSONElement bounds = spec.getBounds().getElement();
        switch (bounds.type()) {
            case mongo::Array: {
                std::vector<BSONElement> array = bounds.Array();

                uassert(5733403,
                        "A bounding array in a range statement must have exactly two elements",
                        array.size() == 2);
                uassert(5733402,
                        "A bounding array must be an ascending array of either two dates or two "
                        "numbers",
                        comp.evaluate(Value(array[0]) <= Value(array[1])));
                if (array[0].isNumber()) {
                    uassert(5733409, "Numeric bounds may not have unit parameter", !unit);
                    uassert(5733406,
                            "A bounding array must contain either both dates or both numeric types",
                            array[1].isNumber());
                    // If these values are types of different sizes, output type may not be
                    // intuitive.
                    uassert(5876900,
                            "Upper bound, lower bound, and step must all have the same type",
                            array[0].type() == array[1].type() &&
                                array[0].type() == step.getType());
                    return Bounds(std::pair<Value, Value>(Value(array[0]), Value(array[1])));
                } else if (array[0].type() == mongo::Date) {
                    uassert(5733405,
                            "A bounding array must contain either both dates or both numeric types",
                            array[1].type() == mongo::Date);
                    uassert(5733410, "A bounding array of dates must specify a unit", unit);
                    return Bounds(std::pair<Date_t, Date_t>(array[0].date(), array[1].date()));
                } else {
                    uasserted(5946800, "Explicit bounds must be numeric or dates");
                }
                MONGO_UNREACHABLE_TASSERT(5946801);
            }
            case mongo::String: {
                if (bounds.str() == kValFull)
                    return Bounds(Full());
                else if (bounds.str() == kValPartition)
                    return Bounds(Partition());
                else
                    uasserted(5946802,
                              str::stream() << "Bounds string must either be '" << kValFull
                                            << "' or '" << kValPartition << "'");
                MONGO_UNREACHABLE_TASSERT(5946803);
            }
            default:
                uasserted(5733404,
                          "The bounds in a range statement must be the string \'full\', "
                          "\'partition\', or an ascending array of two numbers or two dates");
        }
    }();

    RangeStatement range = RangeStatement(step, bounds, unit);
    return range;
}

REGISTER_DOCUMENT_SOURCE(densify,
                         LiteParsedDocumentSourceDefault::parse,
                         document_source_densify::createFromBson,
                         AllowedWithApiStrict::kAlways);

REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalDensify,
                                  LiteParsedDocumentSourceDefault::parse,
                                  DocumentSourceInternalDensify::createFromBson,
                                  true);

namespace document_source_densify {

list<intrusive_ptr<DocumentSource>> createFromBsonInternal(
    BSONElement elem,
    const intrusive_ptr<ExpressionContext>& expCtx,
    StringData stageName,
    bool isInternal) {
    uassert(ErrorCodes::FailedToParse,
            str::stream() << "The " << stageName << " stage specification must be an object, found "
                          << typeName(elem.type()),
            elem.type() == BSONType::Object);

    auto spec = DensifySpec::parse(IDLParserContext(stageName), elem.embeddedObject());
    auto rangeStatement = RangeStatement::parse(spec.getRange());

    list<FieldPath> partitions;
    if (spec.getPartitionByFields()) {
        auto partitionFields = (*spec.getPartitionByFields());
        for (auto& partitionField : partitionFields)
            partitions.push_back(FieldPath(partitionField));
    }

    FieldPath field = FieldPath(spec.getField());

    if (stdx::holds_alternative<RangeStatement::Partition>(rangeStatement.getBounds()) &&
        partitions.empty())
        uasserted(5733408,
                  "One cannot specify the bounds as 'partition' without specifying a non-empty "
                  "array of partitionByFields. You may have meant to specify 'full' bounds.");

    return create(std::move(expCtx),
                  std::move(partitions),
                  std::move(field),
                  std::move(rangeStatement),
                  isInternal);
}

list<intrusive_ptr<DocumentSource>> createFromBson(BSONElement elem,
                                                   const intrusive_ptr<ExpressionContext>& expCtx) {
    return createFromBsonInternal(elem, expCtx, kStageName, false);
}

SortPattern getSortPatternForDensify(RangeStatement rangeStatement,
                                     list<FieldPath> partitions,
                                     FieldPath field) {
    // Add partition fields to sort spec.
    std::vector<SortPatternPart> sortParts;
    // We do not add partitions to the sort spec if the range is "full".
    if (!stdx::holds_alternative<Full>(rangeStatement.getBounds())) {
        for (const auto& partition : partitions) {
            SortPatternPart part;
            part.fieldPath = partition.fullPath();
            sortParts.push_back(std::move(part));
        }
    }

    // Add field path to sort spec.
    SortPatternPart part;
    part.fieldPath = field.fullPath();
    sortParts.push_back(std::move(part));
    return SortPattern{sortParts};
}

list<intrusive_ptr<DocumentSource>> create(const intrusive_ptr<ExpressionContext>& expCtx,
                                           list<FieldPath> partitions,
                                           FieldPath field,
                                           RangeStatement rangeStatement,
                                           bool isInternal) {
    list<intrusive_ptr<DocumentSource>> results;

    // If we're creating an internal stage then we must not desugar and produce a sort stage in
    // addition.
    if (!isInternal) {
        auto sortPattern = getSortPatternForDensify(rangeStatement, partitions, field);
        // Constructing resulting stages.
        results.push_back(DocumentSourceSort::create(expCtx, sortPattern));
    }

    // Constructing resulting stages.
    results.push_back(
        make_intrusive<DocumentSourceInternalDensify>(expCtx, field, partitions, rangeStatement));

    return results;
}
}  // namespace document_source_densify

DocumentSourceInternalDensify::DocGenerator::DocGenerator(DensifyValue min,
                                                          RangeStatement range,
                                                          FieldPath fieldName,
                                                          boost::optional<Document> includeFields,
                                                          boost::optional<Document> finalDoc,
                                                          ValueComparator comp,
                                                          size_t* counter)
    : _comp(std::move(comp)),
      _range(std::move(range)),
      _path(std::move(fieldName.fullPath())),
      _finalDoc(std::move(finalDoc)),
      _min(std::move(min)),
      _counter(counter) {

    if (includeFields) {
        _includeFields = *includeFields;
        tassert(5733306,
                "DocGenerator cannot include field that is being densified",
                _includeFields.getNestedField(_path).missing());
    }


    // Traverse the preserved fields document to make sure we're not going through an array.
    auto traverseDoc = _includeFields;
    auto pathLength = _path.getPathLength();
    for (size_t i = 0; i < pathLength; ++i) {
        auto curVal = traverseDoc.getField(_path.getFieldName(i));
        uassert(5733307, "$densify cannot generate fields nested inside arrays", !curVal.isArray());
        if (curVal.isObject()) {
            traverseDoc = curVal.getDocument();
        } else {
            // Can't write to a field that has a non-object value as a path-prefix, as that would
            // overwrite data. We should only have a non-object at the end of the path.
            uassert(5733308,
                    "$densify cannot overwrite non-object values with objects",
                    i == pathLength - 1 || curVal.missing());
            break;
        }
    }

    tassert(
        5733305, "DocGenerator step must be positive", _comp.evaluate(_range.getStep() > Value(0)));

    tassert(5733700,
            "DocGenerator must be passed a range with ExplicitBounds",
            stdx::holds_alternative<ExplicitBounds>(_range.getBounds()));
    ExplicitBounds bounds = stdx::get<ExplicitBounds>(_range.getBounds());
    tassert(5733304,
            "DocGenerator all values must be same type",
            bounds.first.isSameTypeAs(bounds.second) && bounds.first.isSameTypeAs(_min));

    if (_min.isDate()) {
        // Extra checks for date step + unit.
        tassert(5733501, "Unit must be specified with a date step", _range.getUnit());
        tassert(5733505,
                "Step must be a whole number for date densification",
                _range.getStep().integral64Bit());
    } else {
        tassert(5733506, "Unit must not be specified with non-date values", !_range.getUnit());
    }

    tassert(5733303, "DocGenerator min must be lower or equal to max", _min <= bounds.second);
}

Document DocumentSourceInternalDensify::DocGenerator::getNextDocument() {
    tassert(5733301,
            "Called DocGenerator::getNextDocument() but generator is done",
            _state != GeneratorState::kDone);
    if (_state == GeneratorState::kReturningFinalDocument) {
        _state = GeneratorState::kDone;
        // If _finalDoc is boost::none we can't be in this state.
        tassert(5832800, "DocGenerator expected _finalDoc, found boost::none", _finalDoc);
        return _finalDoc.value();
    }
    // Assume all types have been checked at this point and we are in a valid state.
    DensifyValue valueToAdd = _min;
    DensifyValue nextValue = _min.increment(_range);
    ExplicitBounds bounds = stdx::get<ExplicitBounds>(_range.getBounds());

    if (bounds.second <= nextValue) {
        _state = _finalDoc ? GeneratorState::kReturningFinalDocument : GeneratorState::kDone;
    }
    _min = nextValue;

    MutableDocument retDoc(_includeFields);
    retDoc.setNestedField(_path, valueToAdd.toValue());
    ++(*_counter);
    return retDoc.freeze();
}

bool DocumentSourceInternalDensify::DocGenerator::done() const {
    return _state == GeneratorState::kDone;
}

DocumentSource::GetNextResult DocumentSourceInternalDensify::densifyExplicitRangeAfterEOF() {
    tassert(5734403,
            "Expected explicit range in order to densify after last document.",
            stdx::holds_alternative<ExplicitBounds>(_range.getBounds()));
    auto bounds = stdx::get<ExplicitBounds>(_range.getBounds());
    // Once we have hit an EOF, if the last seen value (_current) plus the step is greater
    // than or equal to the rangeMax, that means we have finished densifying
    // over the explicit range so we just return an EOF. Otherwise, we finish
    // densifying over the rest of the range.
    if (!_current) {
        // We've seen no documents yet.
        auto lowerBound = bounds.first;
        _current = lowerBound;
        createDocGenerator(lowerBound,
                           RangeStatement(_range.getStep(),
                                          ExplicitBounds(bounds.first, bounds.second),
                                          _range.getUnit()));
    } else if (_current->increment(_range) >= bounds.second) {
        _densifyState = DensifyState::kDensifyDone;
        return DocumentSource::GetNextResult::makeEOF();
    } else {
        auto lowerBound = _current->increment(_range);
        createDocGenerator(lowerBound,
                           RangeStatement(_range.getStep(),
                                          ExplicitBounds(bounds.first, bounds.second),
                                          _range.getUnit()));
    }
    _densifyState = DensifyState::kHaveGenerator;
    auto generatedDoc = _docGenerator->getNextDocument();
    if (_docGenerator->done()) {
        _densifyState = DensifyState::kDensifyDone;
        _docGenerator = boost::none;
    }
    return generatedDoc;
}

DocumentSource::GetNextResult DocumentSourceInternalDensify::processDocAboveExplicitMinBound(
    Document doc) {
    auto bounds = stdx::get<ExplicitBounds>(_range.getBounds());
    auto val = getDensifyValue(doc);
    // If we are above the range, there must be more left to densify.
    // Otherwise the state would be kDoneDensify and this function would not be reached.
    tassert(8423306,
            "Cannot be in this state if _current is greater than the upper bound.",
            *_current < bounds.second);
    // _current is the last seen value, don't generate it again.
    DensifyValue lowerBound = _current->increment(_range);

    // If val is the next value to be generated, just return it.
    if (val == lowerBound) {
        setPartitionValue(doc);
        _current = lowerBound;
        return doc;
    }

    DensifyValue upperBound = (val < bounds.second) ? val : bounds.second;
    createDocGenerator(
        lowerBound,
        RangeStatement(_range.getStep(), ExplicitBounds(lowerBound, upperBound), _range.getUnit()),
        _partitionExpr ? boost::make_optional<Document>(getDensifyPartition(doc).getDocument())
                       : boost::none,
        doc);
    Document nextFromGen = _docGenerator->getNextDocument();
    _current = getDensifyValue(nextFromGen);
    _densifyState = DensifyState::kHaveGenerator;
    // If the doc generator is done it will be deleted and the state will be kNeedGen.
    resetDocGen(bounds);
    setPartitionValue(nextFromGen);
    return nextFromGen;
}

DocumentSource::GetNextResult DocumentSourceInternalDensify::processFirstDocForExplicitRange(
    Document doc) {
    auto bounds = stdx::get<ExplicitBounds>(_range.getBounds());
    auto val = getDensifyValue(doc);
    // For the first document in a partition, '_current' is the minimum value - step.
    if (!_current) {
        _current = bounds.first.decrement(_range);
    }
    auto where = getPositionRelativeToRange(val);
    switch (where) {
        case ValComparedToRange::kInside: {
            return processDocAboveExplicitMinBound(doc);
        }
        case ValComparedToRange::kAbove: {
            return processDocAboveExplicitMinBound(doc);
        }
        case ValComparedToRange::kRangeMin: {
            _densifyState = DensifyState::kNeedGen;
            _current = val;
            return doc;
        }
        case ValComparedToRange::kBelow: {
            _densifyState = DensifyState::kUninitializedOrBelowRange;
            return doc;
        }
    }
    MONGO_UNREACHABLE_TASSERT(5733414);
    return DocumentSource::GetNextResult::makeEOF();
}

/** Checks if the generator is done, changes states accordingly. */
void DocumentSourceInternalDensify::resetDocGen(ExplicitBounds bounds) {
    if (_docGenerator->done()) {
        if (*_current >= bounds.second && !_partitionExpr) {
            _densifyState = DensifyState::kDensifyDone;
        } else if (_partitionExpr && _eof) {
            _densifyState = DensifyState::kFinishingDensify;
        } else {
            _densifyState = DensifyState::kNeedGen;
        }
        _docGenerator = boost::none;
    }
}

DocumentSourceInternalDensify::ValComparedToRange
DocumentSourceInternalDensify::getPositionRelativeToRange(DensifyValue val) {
    auto bounds = stdx::get<ExplicitBounds>(_range.getBounds());
    int comparison = DensifyValue::compare(val, bounds.first);
    if (comparison < 0) {
        return DocumentSourceInternalDensify::ValComparedToRange::kBelow;
    } else if (comparison == 0) {
        return DocumentSourceInternalDensify::ValComparedToRange::kRangeMin;
    } else if (val < bounds.second) {
        return DocumentSourceInternalDensify::ValComparedToRange::kInside;
    } else {
        return DocumentSourceInternalDensify::ValComparedToRange::kAbove;
    }
}


DocumentSource::GetNextResult DocumentSourceInternalDensify::finishDensifyingPartitionedInputHelper(
    DensifyValue max, boost::optional<DensifyValue> minOverride) {
    while (_partitionTable.size() != 0) {
        auto firstPartitionKeyVal = _partitionTable.begin();
        Value firstPartition = firstPartitionKeyVal->first;
        DensifyValue firstPartitionVal = firstPartitionKeyVal->second;
        // We've already seen the stored value, we want to start generating on the next
        // one.
        auto valToGenerate = firstPartitionVal.increment(_range);
        // If the valToGenerate is > max seen, skip this partition. It is done.
        if (valToGenerate >= max) {
            _partitionTable.erase(firstPartitionKeyVal);
            continue;
        }
        // If the valToGenerate is < 'minOverride', use the override instead.
        if (minOverride && valToGenerate < *minOverride) {
            valToGenerate = *minOverride;
        }
        createDocGenerator(
            valToGenerate,
            RangeStatement(_range.getStep(), ExplicitBounds(valToGenerate, max), _range.getUnit()),
            firstPartition.getDocument(),
            boost::none  // final doc.
        );
        // Remove this partition from the table, we're done with it.
        _partitionTable.erase(firstPartitionKeyVal);
        _densifyState = DensifyState::kHaveGenerator;
        auto nextDoc = _docGenerator->getNextDocument();
        if (_docGenerator->done()) {
            _docGenerator = boost::none;
            _densifyState = DensifyState::kFinishingDensify;
        }
        return nextDoc;
    }
    _densifyState = DensifyState::kDensifyDone;
    return DocumentSource::GetNextResult::makeEOF();
}
DocumentSource::GetNextResult DocumentSourceInternalDensify::finishDensifyingPartitionedInput() {
    // If the partition map is empty, we're done.
    if (_partitionTable.size() == 0) {
        _densifyState = DensifyState::kDensifyDone;
        return DocumentSource::GetNextResult::makeEOF();
    }
    return stdx::visit(
        OverloadedVisitor{
            [&](Full) {
                // Densify between partitions's last seen value and global max.
                tassert(5733707, "_current must be set if partitionTable is non-empty", _current);
                return finishDensifyingPartitionedInputHelper(
                    _globalMax->isOnStepRelativeTo(*_current, _range)
                        ? _globalMax->increment(_range)
                        : *_globalMax);
            },
            [&](Partition) {
                // Partition bounds don't do any extra work after EOF;
                MONGO_UNREACHABLE_TASSERT(5733704);
                return DocumentSource::GetNextResult::makeEOF();
            },
            [&](ExplicitBounds bounds) {
                // Densify between partitions's last seen value and global max. Use the override for
                // the global min.
                return finishDensifyingPartitionedInputHelper(bounds.second, bounds.first);
            }},
        _range.getBounds());
}

DocumentSource::GetNextResult DocumentSourceInternalDensify::handleSourceExhausted() {
    _eof = true;
    return stdx::visit(OverloadedVisitor{
                           [&](RangeStatement::Full) {
                               if (_partitionExpr) {
                                   return finishDensifyingPartitionedInput();
                               } else {
                                   _densifyState = DensifyState::kDensifyDone;
                                   return DocumentSource::GetNextResult::makeEOF();
                               }
                           },
                           [&](RangeStatement::Partition) {
                               // We have already densified up to the last document in each
                               // partition.
                               _densifyState = DensifyState::kDensifyDone;
                               return DocumentSource::GetNextResult::makeEOF();
                           },
                           [&](RangeStatement::ExplicitBounds bounds) {
                               if (_partitionExpr) {
                                   return finishDensifyingPartitionedInput();
                               }
                               return densifyExplicitRangeAfterEOF();
                           },
                       },
                       _range.getBounds());
}


DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGen(Document currentDoc) {
    auto val = getDensifyValue(currentDoc);
    auto nextValToGenerate = _current->increment(_range);

    // If the current value is the next value to be generated, save it as the current (last seen)
    // value.
    if (val == nextValToGenerate) {
        setPartitionValue(currentDoc);
        _current = val;
    }
    // If we don't need to create a generator (no intervening documents to generate before
    // outputting currentDoc), then don't create a generator or update _current.
    if (val <= nextValToGenerate) {
        return currentDoc;
    }

    // Falling through the above conditions means the currentDoc is strictly greater than the last
    // seen document plus the step value.
    auto newCurrent = _current->increment(_range);
    createDocGenerator(
        newCurrent,
        RangeStatement(_range.getStep(), ExplicitBounds(newCurrent, val), _range.getUnit()),
        _partitionExpr
            ? boost::make_optional<Document>(getDensifyPartition(currentDoc).getDocument())
            : boost::none,
        currentDoc);

    _densifyState = DensifyState::kHaveGenerator;
    auto nextDoc = _docGenerator->getNextDocument();
    if (_docGenerator->done()) {
        _docGenerator = boost::none;
        _densifyState = DensifyState::kNeedGen;
    }
    // Documents generated by the generator are always on the step.
    _current = getDensifyValue(nextDoc);
    // If we are partitioned, save the most recent doc.
    setPartitionValue(nextDoc);
    return nextDoc;
}

DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGenExplicit(
    Document currentDoc) {
    auto bounds = stdx::get<ExplicitBounds>(_range.getBounds());
    auto val = getDensifyValue(currentDoc);
    auto where = getPositionRelativeToRange(val);
    switch (where) {
        case ValComparedToRange::kInside: {
            auto nextStep = _current->increment(_range);
            if (val == nextStep) {
                _current = val;
                setPartitionValue(currentDoc);
                return currentDoc;
            } else if (val < nextStep) {
                return currentDoc;
            }
            return processDocAboveExplicitMinBound(currentDoc);
        }
        case ValComparedToRange::kAbove: {
            auto nextStep = _current->increment(_range);
            if (nextStep >= bounds.second) {
                _current = nextStep;
                // If we are partitioning other partitions may still need to densify.
                setPartitionValue(currentDoc);
                if (!_partitionExpr) {
                    _densifyState = DensifyState::kDensifyDone;
                }
                return currentDoc;
            }
            return processDocAboveExplicitMinBound(currentDoc);
        }
        case ValComparedToRange::kRangeMin: {
            setPartitionValue(currentDoc);
            _current = val;
            return currentDoc;
        }
        case ValComparedToRange::kBelow: {
            setPartitionValue(currentDoc);
            _densifyState = DensifyState::kUninitializedOrBelowRange;
            return currentDoc;
        }
        default: {
            MONGO_UNREACHABLE_TASSERT(5733705);
        }
    }
}
boost::intrusive_ptr<DocumentSource> DocumentSourceInternalDensify::createFromBson(
    BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
    auto results = document_source_densify::createFromBsonInternal(elem, expCtx, kStageName, true);
    tassert(5733413,
            "When creating an $_internalDensify stage, only one stage should be returned",
            results.size() == 1);
    return results.front();
}

Value DocumentSourceInternalDensify::serialize(SerializationOptions opts) const {
    MutableDocument spec;
    spec[kFieldFieldName] = Value(opts.serializeFieldPath(_field));
    std::vector<Value> serializedPartitionByFields(_partitions.size());
    std::transform(_partitions.begin(),
                   _partitions.end(),
                   serializedPartitionByFields.begin(),
                   [&](FieldPath field) -> Value { return Value(opts.serializeFieldPath(field)); });
    spec[kPartitionByFieldsFieldName] = Value(serializedPartitionByFields);
    spec[kRangeFieldName] = _range.serialize(opts);
    MutableDocument out;
    out[getSourceName()] = Value(spec.freeze());

    return Value(out.freezeToValue());
}

void DocumentSourceInternalDensify::initializePartitionState(Document initialDoc) {
    // Initialize _partitionExpr from _partitions.

    // We check whether there is anything in _partitions during parsing.
    tassert(
        6154800, "Expected at least one field when partitioning is enabled.", !_partitions.empty());

    MutableDocument partitionExpr;
    for (auto&& p : _partitions) {
        partitionExpr.setNestedField(p.fullPath(), Value{"$"_sd + p.fullPath()});
    }
    _partitionExpr = ExpressionObject::parse(
        pExpCtx.get(), partitionExpr.freeze().toBson(), pExpCtx->variablesParseState);

    setPartitionValue(initialDoc);
}

DocumentSource::GetNextResult DocumentSourceInternalDensify::doGetNext() {
    // When we return a generated document '_docsGenerated' is incremented. Check that the last
    // document didn't put us over the limit.
    uassert(5897900,
            str::stream() << "Generated " << _docsGenerated
                          << " documents in $densify, which is over the limit of " << _maxDocs
                          << ". Increase the 'internalQueryMaxAllowedDensifyDocs' parameter to "
                             "allow more generated documents",
            _docsGenerated <= _maxDocs);
    switch (_densifyState) {
        case DensifyState::kUninitializedOrBelowRange: {
            // This state represents the first run of doGetNext() or that the value that we last
            // pulled is below the range.

            auto nextDoc = pSource->getNext();
            if (!nextDoc.isAdvanced()) {
                if (nextDoc.isEOF()) {
                    return handleSourceExhausted();
                }
                return nextDoc;
            }

            auto doc = nextDoc.getDocument();
            if (doc.getNestedField(_field).nullish()) {
                // The densify field is not present or null, let document pass unmodified.
                return nextDoc;
            }

            auto val = getDensifyValue(doc);

            // If we have partitions specified, setup the partition expression and table.
            if (_partitions.size() != 0 && !_partitionExpr) {
                initializePartitionState(doc);
            }

            return stdx::visit(
                OverloadedVisitor{
                    [&](Full) {
                        _current = val;
                        _globalMin = val;
                        _globalMax = val;
                        _densifyState = DensifyState::kNeedGen;
                        return nextDoc;
                    },
                    [&](Partition) {
                        tassert(5734400,
                                "Partition state must be initialized for partition bounds",
                                _partitionExpr);
                        _densifyState = DensifyState::kNeedGen;
                        return nextDoc;
                    },
                    [&](ExplicitBounds bounds) {
                        return processFirstDocForExplicitRange(doc);
                    }},
                _range.getBounds());
        }
        case DensifyState::kNeedGen: {
            tassert(8423305, "Document generator must not exist in this state.", !_docGenerator);

            auto nextDoc = pSource->getNext();
            if (!nextDoc.isAdvanced()) {
                if (nextDoc.isEOF()) {
                    return handleSourceExhausted();
                }
                return nextDoc;
            }

            auto currentDoc = nextDoc.getDocument();
            if (currentDoc.getNestedField(_field).nullish()) {
                // The densify field is not present or null, let document pass unmodified.
                return nextDoc;
            }
            auto val = getDensifyValue(currentDoc);

            return stdx::visit(
                OverloadedVisitor{[&](Full) {
                                      if (_partitionExpr) {
                                          // Keep track of '_globalMax' for later. The latest
                                          // document from the source is always the max.
                                          _globalMax = val;
                                          // If we haven't seen this partition before, densify
                                          // between
                                          // '_globalMin' and this value.
                                          auto partitionVal = getDensifyPartition(currentDoc);
                                          auto foundPartitionVal =
                                              _partitionTable.find(partitionVal);
                                          if (foundPartitionVal == _partitionTable.end()) {
                                              // _current represents the last value seen. We want to
                                              // generate _globalMin, so pretend we've seen the
                                              // value before that.
                                              _current = _globalMin->decrement(_range);
                                              // Insert the new partition into the table.
                                              setPartitionValue(currentDoc);
                                              return handleNeedGen(currentDoc);
                                          }
                                          // Otherwise densify between the last seen value and this
                                          // one.
                                          _current = foundPartitionVal->second;
                                      }
                                      return handleNeedGen(currentDoc);
                                  },
                                  [&](Partition) {
                                      // If we haven't seen this partition before, add it to the
                                      // table then return.
                                      auto partitionVal = getDensifyPartition(currentDoc);
                                      auto foundPartitionVal = _partitionTable.find(partitionVal);
                                      if (foundPartitionVal == _partitionTable.end()) {
                                          setPartitionValue(currentDoc);
                                          return nextDoc;
                                      }
                                      // Reset current to be the last value in this partition.
                                      _current = foundPartitionVal->second;
                                      return handleNeedGen(currentDoc);
                                  },
                                  [&](ExplicitBounds bounds) {
                                      if (_partitionExpr) {
                                          // If we haven't seen this partition before, add it to the
                                          // table then check where it is in the range.
                                          auto partitionVal = getDensifyPartition(currentDoc);
                                          auto foundPartitionVal =
                                              _partitionTable.find(partitionVal);
                                          if (foundPartitionVal == _partitionTable.end()) {
                                              setPartitionValue(currentDoc);
                                              // This partition has seen no values.
                                              _current = boost::none;
                                              return processFirstDocForExplicitRange(currentDoc);
                                          }
                                          // Otherwise reset current to be the last value in this
                                          // partition.
                                          _current = foundPartitionVal->second;
                                      }
                                      return handleNeedGenExplicit(nextDoc.getDocument());
                                  }},
                _range.getBounds());
        }
        case DensifyState::kHaveGenerator: {
            tassert(5733203,
                    "Densify state is kHaveGenerator but DocGenerator is null or done.",
                    _docGenerator && !_docGenerator->done());

            auto generatedDoc = _docGenerator->getNextDocument();

            return stdx::visit(
                OverloadedVisitor{[&](Full) {
                                      if (_docGenerator->done()) {
                                          _docGenerator = boost::none;
                                          if (_eof && _partitionExpr) {
                                              _densifyState = DensifyState::kFinishingDensify;
                                          } else {
                                              _densifyState = DensifyState::kNeedGen;
                                          }
                                      }
                                      // The generator's final document may not be on the
                                      // step.
                                      auto genDensifyVal = getDensifyValue(generatedDoc);
                                      if (genDensifyVal == _current->increment(_range)) {
                                          _current = genDensifyVal;
                                          setPartitionValue(generatedDoc);
                                      }
                                      return generatedDoc;
                                  },
                                  [&](Partition) {
                                      if (_docGenerator->done()) {
                                          _docGenerator = boost::none;
                                          _densifyState = DensifyState::kNeedGen;
                                      }
                                      // The generator's final document may not be on the
                                      // step.
                                      auto genDensifyVal = getDensifyValue(generatedDoc);
                                      if (genDensifyVal == _current->increment(_range)) {
                                          _current = genDensifyVal;
                                          setPartitionValue(generatedDoc);
                                      }
                                      return generatedDoc;
                                  },
                                  [&](ExplicitBounds bounds) {
                                      auto val = getDensifyValue(generatedDoc);
                                      // Only want to update the rangeMin if the value -
                                      // current is divisible by the step.
                                      if (val.isOnStepRelativeTo(*_current, _range)) {
                                          _current = val;
                                          setPartitionValue(generatedDoc);
                                      }
                                      resetDocGen(bounds);
                                      return generatedDoc;
                                  }},
                _range.getBounds());
        }
        case DensifyState::kFinishingDensify: {
            tassert(5734402,
                    "Densify expected to have already hit EOF in FinishingDensify state",
                    _eof);
            return finishDensifyingPartitionedInput();
        }
        case DensifyState::kDensifyDone: {
            // In the full range, this should only return EOF.
            // In the explicit range we finish densifying over the range and any remaining documents
            // is passed to the next stage.
            auto doc = pSource->getNext();
            if (stdx::holds_alternative<Full>(_range.getBounds())) {
                tassert(5734005,
                        "GetNextResult must be EOF in kDensifyDone and kFull state",
                        !doc.isAdvanced());
            }
            return doc;
        }
        default: {
            MONGO_UNREACHABLE_TASSERT(5733706);
        }
    }  // namespace mongo
}

DensifyValue DensifyValue::increment(const RangeStatement& range) const {
    return stdx::visit(
        OverloadedVisitor{
            [&](Value val) {
                return DensifyValue(uassertStatusOK(ExpressionAdd::apply(val, range.getStep())));
            },
            [&](Date_t date) {
                return DensifyValue(dateAdd(
                    date, range.getUnit().value(), range.getStep().coerceToLong(), timezone()));
            }},
        _value);
}

DensifyValue DensifyValue::decrement(const RangeStatement& range) const {
    return stdx::visit(
        OverloadedVisitor{
            [&](Value val) {
                return DensifyValue(
                    uassertStatusOK(ExpressionSubtract::apply(val, range.getStep())));
            },
            [&](Date_t date) {
                return DensifyValue(dateAdd(
                    date, range.getUnit().value(), -range.getStep().coerceToLong(), timezone()));
            }},
        _value);
}

bool DensifyValue::isOnStepRelativeTo(DensifyValue base, RangeStatement range) const {
    return stdx::visit(
        OverloadedVisitor{
            [&](Value val) {
                Value diff = uassertStatusOK(ExpressionSubtract::apply(val, base.getNumber()));
                Value remainder = uassertStatusOK(ExpressionMod::apply(diff, range.getStep()));
                return remainder.getDouble() == 0.0;
            },
            [&](Date_t date) {
                auto unit = range.getUnit().value();
                long long step = range.getStep().coerceToLong();
                auto baseDate = base.getDate();

                // Months, quarters and years have variable lengths depending on leap days
                // and days-in-a-month, so a step is not a constant number of milliseconds
                // across all dates. For these units, we need to iterate through rather than
                // performing a calculation with modulo. As long as `baseDate` is not a large number
                // of steps away from the value we're testing (as is true in our usage with _current
                // as the base), this should not be a performance issue.
                if (unit == TimeUnit::month || unit == TimeUnit::quarter ||
                    unit == TimeUnit::year) {

                    Date_t steppedDate = baseDate;
                    while (steppedDate < date) {
                        steppedDate = dateAdd(steppedDate, unit, step, timezone());
                    }
                    return steppedDate == date;
                } else {
                    // Steps with units smaller than one month are always constant sized
                    // (because unix time does not have leap seconds), so we can perform
                    // modulo arithmetic.
                    auto testMillis = date.toMillisSinceEpoch();
                    auto baseMillis = baseDate.toMillisSinceEpoch();
                    auto stepDurationInMillis =
                        dateAdd(Date_t::fromMillisSinceEpoch(0), unit, step, timezone())
                            .toMillisSinceEpoch();
                    auto diff = testMillis - baseMillis;
                    return diff % stepDurationInMillis == 0;
                }
            }},
        _value);
}

Pipeline::SourceContainer::iterator DocumentSourceInternalDensify::combineSorts(
    Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
    if (std::next(itr) == container->end() || itr == container->begin()) {
        return container->end();
    }

    // We can only combine the sorts if we can guarantee the output order will maintain the
    // sort. Densify changes the sort order if partitions are present and range is type 'full'.
    if (_partitions.size() != 0 && stdx::holds_alternative<Full>(_range.getBounds())) {
        // We will not maintain sort order.
        return std::next(itr);
    }

    // If $densify was the first stage in the pipeline, there should be a preceding sort.
    tassert(6059802, "$_internalDensify did not have a preceding stage", itr != container->begin());
    // Get the spec of the preceding sort stage. Densify always has a preceding sort, unless
    // the preceding sort was already removed by an earlier stage.
    const auto preSortItr = std::prev(itr);
    const auto preSortStage = dynamic_cast<DocumentSourceSort*>((*preSortItr).get());
    if (!preSortStage || preSortStage->getLimit()) {
        return std::next(itr);
    }

    // Check that the preceding sort was actually generated by $densify, and not by combining the
    // generated sort with a sort earlier in the pipeline.
    auto densifySortPattern =
        document_source_densify::getSortPatternForDensify(_range, _partitions, _field);

    auto preDensifySortPattern = preSortStage->getSortKeyPattern();
    if (densifySortPattern != preDensifySortPattern) {
        return std::next(itr);
    }

    // Get the spec of the following sort stage, if it exists.
    const auto postSortItr = std::next(itr);
    const auto postSortStage = dynamic_cast<DocumentSourceSort*>((*postSortItr).get());
    if (!postSortStage || postSortStage->getLimit()) {
        // If there is not a following sort stage, we won't do any optimization. Return the next
        // stage in the pipeline.
        return std::next(itr);
    }
    auto postDensifySortPattern = postSortStage->getSortKeyPattern();

    // We can only combine the sorts if the sorts are compatible. $densify only preserves a sort on
    // the fields on which it operates, as any other fields will be missing in generated documents.
    if (!preDensifySortPattern.isExtensionOf(postDensifySortPattern)) {
        return std::next(itr);
    }

    // If the post sort is longer, we would have bailed earlier. Remove the sort after the $densify.
    container->erase(postSortItr);

    return std::prev(itr);
}

Pipeline::SourceContainer::iterator DocumentSourceInternalDensify::doOptimizeAt(
    Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
    tassert(6059800, "Expected to optimize $densify stage", *itr == this);

    return combineSorts(itr, container);
}
}  // namespace mongo