summaryrefslogtreecommitdiff
path: root/src/mongo/db/sorter
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2022-01-28 23:33:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-16 18:12:45 +0000
commit65348d1c32d7b46eea6e8120e56337c4475afc49 (patch)
tree522c07d5d9f3da2b368db7bb3773c09f0994bfe4 /src/mongo/db/sorter
parent729f18d0a3adef3c06ca4e15a04160c78c54e5ad (diff)
downloadmongo-65348d1c32d7b46eea6e8120e56337c4475afc49.tar.gz
SERVER-63699 Create a very limited bounded-sort stage for time-series
Diffstat (limited to 'src/mongo/db/sorter')
-rw-r--r--src/mongo/db/sorter/sorter.h120
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp143
2 files changed, 263 insertions, 0 deletions
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 8a0c696adde..945b50e4d6a 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -35,12 +35,14 @@
#include <deque>
#include <fstream>
#include <memory>
+#include <queue>
#include <string>
#include <utility>
#include <vector>
#include "mongo/bson/util/builder.h"
#include "mongo/db/sorter/sorter_gen.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/bufreader.h"
/**
@@ -367,6 +369,124 @@ protected:
};
/**
+ * Sorts data that is already "almost sorted", meaning we can put a bound on how out-of-order
+ * any two input elements are. For example, maybe we are sorting by {time: 1} and we know that no
+ * two documents are more than an hour out of order. This means as soon as we see {time: t}, we know
+ * that any document earlier than {time: t - 1h} is safe to return.
+ *
+ * Note what's bounded is the difference in sort-key values, not the number of inversions.
+ * This means we don't know how much space we'll need.
+ *
+ * This is not a subclass of Sorter because the interface is different: Sorter has a strict
+ * separation between reading input and returning results, while BoundedSorter can alternate
+ * between the two.
+ *
+ * Comparator does a 3-way comparison between two Keys: comp(x, y) < 0 iff x < y.
+ *
+ * BoundMaker takes a Key from the input, and computes a bound. The bound is a Key that is
+ * less-or-equal to all future Keys that will be seen in the input.
+ */
+template <typename Key, typename Value, typename Comparator, typename BoundMaker>
+class BoundedSorter {
+public:
+ // 'Comparator' is a 3-way comparison, but std::priority_queue wants a '<' comparison.
+ // But also, std::priority_queue is a max-heap, and we want a min-heap.
+ // And also, 'Comparator' compares Keys, but std::priority_queue calls its comparator
+ // on whole elements.
+ struct Greater {
+ bool operator()(const std::pair<Key, Value>& p1, const std::pair<Key, Value>& p2) const {
+ return compare(p1.first, p2.first) > 0;
+ }
+ Comparator compare;
+ };
+
+ BoundedSorter(Comparator comp, BoundMaker makeBound)
+ : compare(comp), makeBound(makeBound), _heap(Greater{comp}) {}
+
+ // Feed one item of input to the sorter.
+ // Together, add() and done() represent the input stream.
+ void add(Key key, Value value) {
+ invariant(!_done);
+ // If a new value violates what we thought was our min bound, something has gone wrong.
+ if (checkInput && _min)
+ uassert(6369910, "BoundedSorter input is too out-of-order.", compare(*_min, key) <= 0);
+
+ // Each new item can potentially give us a tighter bound (a higher min).
+ Key newMin = makeBound(key);
+ if (!_min || compare(*_min, newMin) < 0)
+ _min = newMin;
+
+ _heap.emplace(std::move(key), std::move(value));
+ }
+
+ // Indicate that no more input will arrive.
+ // Together, add() and done() represent the input stream.
+ void done() {
+ invariant(!_done);
+ _done = true;
+ }
+
+ enum class State {
+ // An output document is not available yet, but this may change as more input arrives.
+ kWait,
+ // An output document is available now: you may call next() once.
+ kReady,
+ // All output has been returned.
+ kDone,
+ };
+ // Together, state() and next() represent the output stream.
+ // See BoundedSorter::State for the meaning of each case.
+ State getState() const {
+ if (_done) {
+ // No more input will arrive, so we're never in state kWait.
+ return _heap.empty() ? State::kDone : State::kReady;
+ } else {
+ if (_heap.empty())
+ return State::kWait;
+ dassert(_min);
+
+ // _heap.top() is the min of _heap, but we also need to consider whether a smaller input
+ // will arrive later. So _heap.top() is safe to return only if heap.top() < _min.
+ if (compare(_heap.top().first, *_min) < 0)
+ return State::kReady;
+
+ // A later call to add() may improve _min. Or in the worst case, after done() is called
+ // we will return everything in _heap.
+ return State::kWait;
+ }
+ }
+
+ // Remove and return one item of output.
+ // Only valid to call when getState() == kReady.
+ // Together, state() and next() represent the output stream.
+ std::pair<Key, Value> next() {
+ dassert(getState() == State::kReady);
+ auto result = _heap.top();
+ _heap.pop();
+ return result;
+ }
+
+ size_t size() const {
+ return _heap.size();
+ }
+
+ // By default, uassert that the input meets our assumptions of being almost-sorted.
+ // But if _checkInput is false, don't do that check.
+ // The output will be in the wrong order but otherwise it should work.
+ bool checkInput = true;
+
+ Comparator compare;
+ BoundMaker makeBound;
+
+private:
+ using KV = std::pair<Key, Value>;
+ std::priority_queue<KV, std::vector<KV>, Greater> _heap;
+
+ boost::optional<Key> _min;
+ bool _done = false;
+};
+
+/**
* Appends a pre-sorted range of data to a given file and hands back an Iterator over that file
* range.
*/
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index 5c3c1ed74a3..8fa5704239f 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -902,6 +902,149 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
}
}
+class BoundedSorterTest : public unittest::Test {
+public:
+ using Key = int;
+ struct Doc {
+ Key time;
+
+ bool operator==(const Doc& other) {
+ return time == other.time;
+ }
+ };
+ struct Comparator {
+ int operator()(Key x, Key y) const {
+ return x - y;
+ }
+ };
+ struct BoundMaker {
+ Key operator()(Key k) const {
+ return k - 10;
+ }
+ };
+ using S = BoundedSorter<Key, Doc, Comparator, BoundMaker>;
+
+ /**
+ * Feed the input into the sorter one-by-one, taking any output as soon as it's available.
+ */
+ std::vector<Doc> sort(std::vector<Doc> input) {
+ std::vector<Doc> output;
+ auto push = [&](Doc doc) { output.push_back(doc); };
+
+ for (auto&& doc : input) {
+ sorter.add(doc.time, doc);
+ while (sorter.getState() == S::State::kReady)
+ push(sorter.next().second);
+ }
+ sorter.done();
+
+ while (sorter.getState() == S::State::kReady)
+ push(sorter.next().second);
+ ASSERT(sorter.getState() == S::State::kDone);
+
+ ASSERT(output.size() == input.size());
+ return output;
+ }
+
+ static void assertSorted(const std::vector<Doc>& docs) {
+ for (size_t i = 1; i < docs.size(); ++i) {
+ Doc prev = docs[i - 1];
+ Doc curr = docs[i];
+ ASSERT_LTE(prev.time, curr.time);
+ }
+ }
+
+ S sorter{{}, {}};
+};
+TEST_F(BoundedSorterTest, Empty) {
+ ASSERT(sorter.getState() == S::State::kWait);
+
+ sorter.done();
+ ASSERT(sorter.getState() == S::State::kDone);
+}
+TEST_F(BoundedSorterTest, Sorted) {
+ auto output = sort({
+ {0},
+ {3},
+ {10},
+ {11},
+ {12},
+ {13},
+ {14},
+ {15},
+ {16},
+ });
+ assertSorted(output);
+}
+
+TEST_F(BoundedSorterTest, SortedExceptOne) {
+ auto output = sort({
+ {0},
+ {3},
+ {10},
+ // Swap 11 and 12.
+ {12},
+ {11},
+ {13},
+ {14},
+ {15},
+ {16},
+ });
+ assertSorted(output);
+}
+
+TEST_F(BoundedSorterTest, AlmostSorted) {
+ auto output = sort({
+ // 0 and 11 cannot swap.
+ {0},
+ {11},
+ {13},
+ {10},
+ {12},
+ // 3 and 14 cannot swap.
+ {3},
+ {14},
+ {15},
+ {16},
+ });
+ assertSorted(output);
+}
+
+TEST_F(BoundedSorterTest, WrongInput) {
+ std::vector<Doc> input = {
+ {3},
+ {4},
+ {5},
+ {10},
+ {15},
+ // This 1 is too far out of order: it's more than 10 away from 15.
+ // So it will appear too late in the output.
+ // We will still be hanging on to anything in the range [5, inf).
+ // So we will have already returned 3, 4.
+ {1},
+ {16},
+ };
+
+ // Disable input order checking so we can see what happens.
+ sorter.checkInput = false;
+ auto output = sort(input);
+ ASSERT_EQ(output.size(), 7);
+
+ ASSERT_EQ(output[0].time, 3);
+ ASSERT_EQ(output[1].time, 4);
+ ASSERT_EQ(output[2].time, 1); // Out of order.
+ ASSERT_EQ(output[3].time, 5);
+ ASSERT_EQ(output[4].time, 10);
+ ASSERT_EQ(output[5].time, 15);
+ ASSERT_EQ(output[6].time, 16);
+
+ // Test that by default, bad input like this would be detected.
+ sorter = S{{}, {}};
+ ASSERT(sorter.checkInput);
+ ASSERT_THROWS_CODE(sort(input), DBException, 6369910);
+}
+
+
} // namespace
} // namespace sorter
} // namespace mongo