diff options
author | David Percy <david.percy@mongodb.com> | 2022-01-28 23:33:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-16 18:12:45 +0000 |
commit | 65348d1c32d7b46eea6e8120e56337c4475afc49 (patch) | |
tree | 522c07d5d9f3da2b368db7bb3773c09f0994bfe4 /src/mongo/db/sorter | |
parent | 729f18d0a3adef3c06ca4e15a04160c78c54e5ad (diff) | |
download | mongo-65348d1c32d7b46eea6e8120e56337c4475afc49.tar.gz |
SERVER-63699 Create a very limited bounded-sort stage for time-series
Diffstat (limited to 'src/mongo/db/sorter')
-rw-r--r-- | src/mongo/db/sorter/sorter.h | 120 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter_test.cpp | 143 |
2 files changed, 263 insertions, 0 deletions
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 8a0c696adde..945b50e4d6a 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -35,12 +35,14 @@ #include <deque> #include <fstream> #include <memory> +#include <queue> #include <string> #include <utility> #include <vector> #include "mongo/bson/util/builder.h" #include "mongo/db/sorter/sorter_gen.h" +#include "mongo/util/assert_util.h" #include "mongo/util/bufreader.h" /** @@ -367,6 +369,124 @@ protected: }; /** + * Sorts data that is already "almost sorted", meaning we can put a bound on how out-of-order + * any two input elements are. For example, maybe we are sorting by {time: 1} and we know that no + * two documents are more than an hour out of order. This means as soon as we see {time: t}, we know + * that any document earlier than {time: t - 1h} is safe to return. + * + * Note what's bounded is the difference in sort-key values, not the number of inversions. + * This means we don't know how much space we'll need. + * + * This is not a subclass of Sorter because the interface is different: Sorter has a strict + * separation between reading input and returning results, while BoundedSorter can alternate + * between the two. + * + * Comparator does a 3-way comparison between two Keys: comp(x, y) < 0 iff x < y. + * + * BoundMaker takes a Key from the input, and computes a bound. The bound is a Key that is + * less-or-equal to all future Keys that will be seen in the input. + */ +template <typename Key, typename Value, typename Comparator, typename BoundMaker> +class BoundedSorter { +public: + // 'Comparator' is a 3-way comparison, but std::priority_queue wants a '<' comparison. + // But also, std::priority_queue is a max-heap, and we want a min-heap. + // And also, 'Comparator' compares Keys, but std::priority_queue calls its comparator + // on whole elements. + struct Greater { + bool operator()(const std::pair<Key, Value>& p1, const std::pair<Key, Value>& p2) const { + return compare(p1.first, p2.first) > 0; + } + Comparator compare; + }; + + BoundedSorter(Comparator comp, BoundMaker makeBound) + : compare(comp), makeBound(makeBound), _heap(Greater{comp}) {} + + // Feed one item of input to the sorter. + // Together, add() and done() represent the input stream. + void add(Key key, Value value) { + invariant(!_done); + // If a new value violates what we thought was our min bound, something has gone wrong. + if (checkInput && _min) + uassert(6369910, "BoundedSorter input is too out-of-order.", compare(*_min, key) <= 0); + + // Each new item can potentially give us a tighter bound (a higher min). + Key newMin = makeBound(key); + if (!_min || compare(*_min, newMin) < 0) + _min = newMin; + + _heap.emplace(std::move(key), std::move(value)); + } + + // Indicate that no more input will arrive. + // Together, add() and done() represent the input stream. + void done() { + invariant(!_done); + _done = true; + } + + enum class State { + // An output document is not available yet, but this may change as more input arrives. + kWait, + // An output document is available now: you may call next() once. + kReady, + // All output has been returned. + kDone, + }; + // Together, state() and next() represent the output stream. + // See BoundedSorter::State for the meaning of each case. + State getState() const { + if (_done) { + // No more input will arrive, so we're never in state kWait. + return _heap.empty() ? State::kDone : State::kReady; + } else { + if (_heap.empty()) + return State::kWait; + dassert(_min); + + // _heap.top() is the min of _heap, but we also need to consider whether a smaller input + // will arrive later. So _heap.top() is safe to return only if heap.top() < _min. + if (compare(_heap.top().first, *_min) < 0) + return State::kReady; + + // A later call to add() may improve _min. Or in the worst case, after done() is called + // we will return everything in _heap. + return State::kWait; + } + } + + // Remove and return one item of output. + // Only valid to call when getState() == kReady. + // Together, state() and next() represent the output stream. + std::pair<Key, Value> next() { + dassert(getState() == State::kReady); + auto result = _heap.top(); + _heap.pop(); + return result; + } + + size_t size() const { + return _heap.size(); + } + + // By default, uassert that the input meets our assumptions of being almost-sorted. + // But if _checkInput is false, don't do that check. + // The output will be in the wrong order but otherwise it should work. + bool checkInput = true; + + Comparator compare; + BoundMaker makeBound; + +private: + using KV = std::pair<Key, Value>; + std::priority_queue<KV, std::vector<KV>, Greater> _heap; + + boost::optional<Key> _min; + bool _done = false; +}; + +/** * Appends a pre-sorted range of data to a given file and hands back an Iterator over that file * range. */ diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index 5c3c1ed74a3..8fa5704239f 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -902,6 +902,149 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) { } } +class BoundedSorterTest : public unittest::Test { +public: + using Key = int; + struct Doc { + Key time; + + bool operator==(const Doc& other) { + return time == other.time; + } + }; + struct Comparator { + int operator()(Key x, Key y) const { + return x - y; + } + }; + struct BoundMaker { + Key operator()(Key k) const { + return k - 10; + } + }; + using S = BoundedSorter<Key, Doc, Comparator, BoundMaker>; + + /** + * Feed the input into the sorter one-by-one, taking any output as soon as it's available. + */ + std::vector<Doc> sort(std::vector<Doc> input) { + std::vector<Doc> output; + auto push = [&](Doc doc) { output.push_back(doc); }; + + for (auto&& doc : input) { + sorter.add(doc.time, doc); + while (sorter.getState() == S::State::kReady) + push(sorter.next().second); + } + sorter.done(); + + while (sorter.getState() == S::State::kReady) + push(sorter.next().second); + ASSERT(sorter.getState() == S::State::kDone); + + ASSERT(output.size() == input.size()); + return output; + } + + static void assertSorted(const std::vector<Doc>& docs) { + for (size_t i = 1; i < docs.size(); ++i) { + Doc prev = docs[i - 1]; + Doc curr = docs[i]; + ASSERT_LTE(prev.time, curr.time); + } + } + + S sorter{{}, {}}; +}; +TEST_F(BoundedSorterTest, Empty) { + ASSERT(sorter.getState() == S::State::kWait); + + sorter.done(); + ASSERT(sorter.getState() == S::State::kDone); +} +TEST_F(BoundedSorterTest, Sorted) { + auto output = sort({ + {0}, + {3}, + {10}, + {11}, + {12}, + {13}, + {14}, + {15}, + {16}, + }); + assertSorted(output); +} + +TEST_F(BoundedSorterTest, SortedExceptOne) { + auto output = sort({ + {0}, + {3}, + {10}, + // Swap 11 and 12. + {12}, + {11}, + {13}, + {14}, + {15}, + {16}, + }); + assertSorted(output); +} + +TEST_F(BoundedSorterTest, AlmostSorted) { + auto output = sort({ + // 0 and 11 cannot swap. + {0}, + {11}, + {13}, + {10}, + {12}, + // 3 and 14 cannot swap. + {3}, + {14}, + {15}, + {16}, + }); + assertSorted(output); +} + +TEST_F(BoundedSorterTest, WrongInput) { + std::vector<Doc> input = { + {3}, + {4}, + {5}, + {10}, + {15}, + // This 1 is too far out of order: it's more than 10 away from 15. + // So it will appear too late in the output. + // We will still be hanging on to anything in the range [5, inf). + // So we will have already returned 3, 4. + {1}, + {16}, + }; + + // Disable input order checking so we can see what happens. + sorter.checkInput = false; + auto output = sort(input); + ASSERT_EQ(output.size(), 7); + + ASSERT_EQ(output[0].time, 3); + ASSERT_EQ(output[1].time, 4); + ASSERT_EQ(output[2].time, 1); // Out of order. + ASSERT_EQ(output[3].time, 5); + ASSERT_EQ(output[4].time, 10); + ASSERT_EQ(output[5].time, 15); + ASSERT_EQ(output[6].time, 16); + + // Test that by default, bad input like this would be detected. + sorter = S{{}, {}}; + ASSERT(sorter.checkInput); + ASSERT_THROWS_CODE(sort(input), DBException, 6369910); +} + + } // namespace } // namespace sorter } // namespace mongo |