diff options
author | David Percy <david.percy@mongodb.com> | 2022-01-28 23:33:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-16 18:12:45 +0000 |
commit | 65348d1c32d7b46eea6e8120e56337c4475afc49 (patch) | |
tree | 522c07d5d9f3da2b368db7bb3773c09f0994bfe4 /src | |
parent | 729f18d0a3adef3c06ca4e15a04160c78c54e5ad (diff) | |
download | mongo-65348d1c32d7b46eea6e8120e56337c4475afc49.tar.gz |
SERVER-63699 Create a very limited bounded-sort stage for time-series
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 21 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.h | 120 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter_test.cpp | 143 |
4 files changed, 382 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index b47fb4036f0..9a9f6aeb7d0 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -43,8 +44,10 @@ #include "mongo/db/pipeline/skip_and_limit.h" #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/db/stats/resource_consumption_metrics.h" +#include "mongo/logv2/log.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/util/assert_util.h" namespace mongo { @@ -75,8 +78,47 @@ REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, DocumentSourceSort::createFromBson, AllowedWithApiStrict::kAlways); +REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( + _internalBoundedSort, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceSort::parseBoundedSort, + AllowedWithApiStrict::kNeverInVersion1, + AllowedWithClientType::kAny, + boost:: + none /* TODO SERVER-52286 feature_flags::gFeatureFlagBucketUnpackWithSort.getVersion() */, + feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabledAndIgnoreFCV()); + DocumentSource::GetNextResult DocumentSourceSort::doGetNext() { + if (_timeSorter) { + // Only pull input as necessary to get _timeSorter to have a result. + while (_timeSorter->getState() == TimeSorter::State::kWait) { + auto input = pSource->getNext(); + switch (input.getStatus()) { + case GetNextResult::ReturnStatus::kPauseExecution: + return input; + case GetNextResult::ReturnStatus::kEOF: + // Tell _timeSorter there will be no more input. In response, its state + // will never be kWait again, and so we'll never call pSource->getNext() again. + _timeSorter->done(); + continue; + case GetNextResult::ReturnStatus::kAdvanced: + Document doc = input.getDocument(); + auto time = doc.getField(_sortExecutor->sortPattern()[0].fieldPath->fullPath()); + uassert(6369909, + "$_internalBoundedSort only handles Date values", + time.getType() == Date); + _timeSorter->add(time.getDate(), doc); + continue; + } + } + + if (_timeSorter->getState() == TimeSorter::State::kDone) + return GetNextResult::makeEOF(); + + return _timeSorter->next().second; + } + if (!_populated) { const auto populationResult = populate(); if (populationResult.isPaused()) { @@ -101,6 +143,25 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSort::clone() const { void DocumentSourceSort::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { + if (_timeSorter) { + tassert(6369900, + "$_internalBoundedSort should not absorb a $limit", + !_sortExecutor->hasLimit()); + + // TODO SERVER-63637 Implement execution stats. + + // {$_internalBoundedSort: {sortKey, bound}} + auto sortKey = _sortExecutor->sortPattern().serialize( + SortPattern::SortKeySerialization::kForPipelineSerialization); + array.push_back(Value{Document{{ + {"$_internalBoundedSort"_sd, + Document{{ + {"sortKey"_sd, std::move(sortKey)}, + {"bound"_sd, durationCount<Seconds>(_timeSorter->makeBound.bound)}, + }}}, + }}}); + return; + } uint64_t limit = _sortExecutor->getLimit(); @@ -145,6 +206,11 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (_timeSorter) { + // Do not absorb a limit, or combine with other sort stages. + return std::next(itr); + } + auto stageItr = std::next(itr); auto limit = extractLimitForPushdown(stageItr, container); if (limit) @@ -209,6 +275,33 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( return pSort; } +intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( + BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + uassert(6369905, + "the $_internalBoundedSort key specification must be an object", + elem.type() == Object); + BSONObj args = elem.embeddedObject(); + + BSONElement key = args["sortKey"]; + uassert(6369904, "$_internalBoundedSort sortKey must be an object", key.type() == Object); + SortPattern pat{key.embeddedObject(), expCtx}; + uassert(6369903, "$_internalBoundedSort doesn't support compound sort", pat.size() == 1); + uassert(6369902, "$_internalBoundedSort only handles ascending sort", pat[0].isAscending); + uassert(6369901, + "$_internalBoundedSort doesn't support an expression in the sortKey", + pat[0].expression == nullptr); + uassert(6369907, + "$_internalBoundedSort doesn't support dotted field names", + pat[0].fieldPath->getPathLength() == 1); + + BSONElement bound = args["bound"]; + int boundN = uassertStatusOK(bound.parseIntegerElementToNonNegativeInt()); + + auto ds = DocumentSourceSort::create(expCtx, pat); + ds->_timeSorter.reset(new TimeSorter{{}, BoundMaker{Seconds{boundN}}}); + return ds; +} + DocumentSource::GetNextResult DocumentSourceSort::populate() { auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { @@ -260,6 +353,11 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co } boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() { + uassert(6369906, + "$_internalBoundedSort cannot be the first stage on the merger, because it requires " + "almost-sorted input, which the shardsPart of a pipeline can't provide", + !_timeSorter); + DistributedPlanLogic split; split.shardsStage = this; split.inputSortPattern = _sortExecutor->sortPattern() diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 1660c91b9f2..1043f9a5d9a 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -113,6 +113,12 @@ public: } /** + * Parse a stage that uses BoundedSorter. + */ + static boost::intrusive_ptr<DocumentSourceSort> parseBoundedSort( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** * Returns the the limit, if a subsequent $limit stage has been coalesced with this $sort stage. * Otherwise, returns boost::none. */ @@ -189,6 +195,21 @@ private: boost::optional<SortExecutor<Document>> _sortExecutor; boost::optional<SortKeyGenerator> _sortKeyGen; + + struct BoundMaker { + Seconds bound; + + Date_t operator()(Date_t key) { + return key - bound; + } + }; + struct Comp { + int operator()(Date_t x, Date_t y) const { + return x.toMillisSinceEpoch() - y.toMillisSinceEpoch(); + } + }; + using TimeSorter = BoundedSorter<Date_t, Document, Comp, BoundMaker>; + std::unique_ptr<TimeSorter> _timeSorter; }; } // namespace mongo diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 8a0c696adde..945b50e4d6a 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -35,12 +35,14 @@ #include <deque> #include <fstream> #include <memory> +#include <queue> #include <string> #include <utility> #include <vector> #include "mongo/bson/util/builder.h" #include "mongo/db/sorter/sorter_gen.h" +#include "mongo/util/assert_util.h" #include "mongo/util/bufreader.h" /** @@ -367,6 +369,124 @@ protected: }; /** + * Sorts data that is already "almost sorted", meaning we can put a bound on how out-of-order + * any two input elements are. For example, maybe we are sorting by {time: 1} and we know that no + * two documents are more than an hour out of order. This means as soon as we see {time: t}, we know + * that any document earlier than {time: t - 1h} is safe to return. + * + * Note what's bounded is the difference in sort-key values, not the number of inversions. + * This means we don't know how much space we'll need. + * + * This is not a subclass of Sorter because the interface is different: Sorter has a strict + * separation between reading input and returning results, while BoundedSorter can alternate + * between the two. + * + * Comparator does a 3-way comparison between two Keys: comp(x, y) < 0 iff x < y. + * + * BoundMaker takes a Key from the input, and computes a bound. The bound is a Key that is + * less-or-equal to all future Keys that will be seen in the input. + */ +template <typename Key, typename Value, typename Comparator, typename BoundMaker> +class BoundedSorter { +public: + // 'Comparator' is a 3-way comparison, but std::priority_queue wants a '<' comparison. + // But also, std::priority_queue is a max-heap, and we want a min-heap. + // And also, 'Comparator' compares Keys, but std::priority_queue calls its comparator + // on whole elements. + struct Greater { + bool operator()(const std::pair<Key, Value>& p1, const std::pair<Key, Value>& p2) const { + return compare(p1.first, p2.first) > 0; + } + Comparator compare; + }; + + BoundedSorter(Comparator comp, BoundMaker makeBound) + : compare(comp), makeBound(makeBound), _heap(Greater{comp}) {} + + // Feed one item of input to the sorter. + // Together, add() and done() represent the input stream. + void add(Key key, Value value) { + invariant(!_done); + // If a new value violates what we thought was our min bound, something has gone wrong. + if (checkInput && _min) + uassert(6369910, "BoundedSorter input is too out-of-order.", compare(*_min, key) <= 0); + + // Each new item can potentially give us a tighter bound (a higher min). + Key newMin = makeBound(key); + if (!_min || compare(*_min, newMin) < 0) + _min = newMin; + + _heap.emplace(std::move(key), std::move(value)); + } + + // Indicate that no more input will arrive. + // Together, add() and done() represent the input stream. + void done() { + invariant(!_done); + _done = true; + } + + enum class State { + // An output document is not available yet, but this may change as more input arrives. + kWait, + // An output document is available now: you may call next() once. + kReady, + // All output has been returned. + kDone, + }; + // Together, state() and next() represent the output stream. + // See BoundedSorter::State for the meaning of each case. + State getState() const { + if (_done) { + // No more input will arrive, so we're never in state kWait. + return _heap.empty() ? State::kDone : State::kReady; + } else { + if (_heap.empty()) + return State::kWait; + dassert(_min); + + // _heap.top() is the min of _heap, but we also need to consider whether a smaller input + // will arrive later. So _heap.top() is safe to return only if heap.top() < _min. + if (compare(_heap.top().first, *_min) < 0) + return State::kReady; + + // A later call to add() may improve _min. Or in the worst case, after done() is called + // we will return everything in _heap. + return State::kWait; + } + } + + // Remove and return one item of output. + // Only valid to call when getState() == kReady. + // Together, state() and next() represent the output stream. + std::pair<Key, Value> next() { + dassert(getState() == State::kReady); + auto result = _heap.top(); + _heap.pop(); + return result; + } + + size_t size() const { + return _heap.size(); + } + + // By default, uassert that the input meets our assumptions of being almost-sorted. + // But if _checkInput is false, don't do that check. + // The output will be in the wrong order but otherwise it should work. + bool checkInput = true; + + Comparator compare; + BoundMaker makeBound; + +private: + using KV = std::pair<Key, Value>; + std::priority_queue<KV, std::vector<KV>, Greater> _heap; + + boost::optional<Key> _min; + bool _done = false; +}; + +/** * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file * range. */ diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index 5c3c1ed74a3..8fa5704239f 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -902,6 +902,149 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { } } +class BoundedSorterTest : public unittest::Test { +public: + using Key = int; + struct Doc { + Key time; + + bool operator==(const Doc& other) { + return time == other.time; + } + }; + struct Comparator { + int operator()(Key x, Key y) const { + return x - y; + } + }; + struct BoundMaker { + Key operator()(Key k) const { + return k - 10; + } + }; + using S = BoundedSorter<Key, Doc, Comparator, BoundMaker>; + + /** + * Feed the input into the sorter one-by-one, taking any output as soon as it's available. + */ + std::vector<Doc> sort(std::vector<Doc> input) { + std::vector<Doc> output; + auto push = [&](Doc doc) { output.push_back(doc); }; + + for (auto&& doc : input) { + sorter.add(doc.time, doc); + while (sorter.getState() == S::State::kReady) + push(sorter.next().second); + } + sorter.done(); + + while (sorter.getState() == S::State::kReady) + push(sorter.next().second); + ASSERT(sorter.getState() == S::State::kDone); + + ASSERT(output.size() == input.size()); + return output; + } + + static void assertSorted(const std::vector<Doc>& docs) { + for (size_t i = 1; i < docs.size(); ++i) { + Doc prev = docs[i - 1]; + Doc curr = docs[i]; + ASSERT_LTE(prev.time, curr.time); + } + } + + S sorter{{}, {}}; +}; +TEST_F(BoundedSorterTest, Empty) { + ASSERT(sorter.getState() == S::State::kWait); + + sorter.done(); + ASSERT(sorter.getState() == S::State::kDone); +} +TEST_F(BoundedSorterTest, Sorted) { + auto output = sort({ + {0}, + {3}, + {10}, + {11}, + {12}, + {13}, + {14}, + {15}, + {16}, + }); + assertSorted(output); +} + +TEST_F(BoundedSorterTest, SortedExceptOne) { + auto output = sort({ + {0}, + {3}, + {10}, + // Swap 11 and 12. + {12}, + {11}, + {13}, + {14}, + {15}, + {16}, + }); + assertSorted(output); +} + +TEST_F(BoundedSorterTest, AlmostSorted) { + auto output = sort({ + // 0 and 11 cannot swap. + {0}, + {11}, + {13}, + {10}, + {12}, + // 3 and 14 cannot swap. + {3}, + {14}, + {15}, + {16}, + }); + assertSorted(output); +} + +TEST_F(BoundedSorterTest, WrongInput) { + std::vector<Doc> input = { + {3}, + {4}, + {5}, + {10}, + {15}, + // This 1 is too far out of order: it's more than 10 away from 15. + // So it will appear too late in the output. + // We will still be hanging on to anything in the range [5, inf). + // So we will have already returned 3, 4. + {1}, + {16}, + }; + + // Disable input order checking so we can see what happens. + sorter.checkInput = false; + auto output = sort(input); + ASSERT_EQ(output.size(), 7); + + ASSERT_EQ(output[0].time, 3); + ASSERT_EQ(output[1].time, 4); + ASSERT_EQ(output[2].time, 1); // Out of order. + ASSERT_EQ(output[3].time, 5); + ASSERT_EQ(output[4].time, 10); + ASSERT_EQ(output[5].time, 15); + ASSERT_EQ(output[6].time, 16); + + // Test that by default, bad input like this would be detected. + sorter = S{{}, {}}; + ASSERT(sorter.checkInput); + ASSERT_THROWS_CODE(sort(input), DBException, 6369910); +} + + } // namespace } // namespace sorter } // namespace mongo |