summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2022-01-28 23:33:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-16 18:12:45 +0000
commit65348d1c32d7b46eea6e8120e56337c4475afc49 (patch)
tree522c07d5d9f3da2b368db7bb3773c09f0994bfe4 /src
parent729f18d0a3adef3c06ca4e15a04160c78c54e5ad (diff)
downloadmongo-65348d1c32d7b46eea6e8120e56337c4475afc49.tar.gz
SERVER-63699 Create a very limited bounded-sort stage for time-series
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp98
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h21
-rw-r--r--src/mongo/db/sorter/sorter.h120
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp143
4 files changed, 382 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index b47fb4036f0..9a9f6aeb7d0 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -26,6 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -43,8 +44,10 @@
#include "mongo/db/pipeline/skip_and_limit.h"
#include "mongo/db/query/collation/collation_index_key.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
+#include "mongo/logv2/log.h"
#include "mongo/platform/overflow_arithmetic.h"
#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
@@ -75,8 +78,47 @@ REGISTER_DOCUMENT_SOURCE(sort,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSort::createFromBson,
AllowedWithApiStrict::kAlways);
+REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(
+ _internalBoundedSort,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceSort::parseBoundedSort,
+ AllowedWithApiStrict::kNeverInVersion1,
+ AllowedWithClientType::kAny,
+ boost::
+ none /* TODO SERVER-52286 feature_flags::gFeatureFlagBucketUnpackWithSort.getVersion() */,
+ feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabledAndIgnoreFCV());
+
DocumentSource::GetNextResult DocumentSourceSort::doGetNext() {
+ if (_timeSorter) {
+ // Only pull input as necessary to get _timeSorter to have a result.
+ while (_timeSorter->getState() == TimeSorter::State::kWait) {
+ auto input = pSource->getNext();
+ switch (input.getStatus()) {
+ case GetNextResult::ReturnStatus::kPauseExecution:
+ return input;
+ case GetNextResult::ReturnStatus::kEOF:
+ // Tell _timeSorter there will be no more input. In response, its state
+ // will never be kWait again, and so we'll never call pSource->getNext() again.
+ _timeSorter->done();
+ continue;
+ case GetNextResult::ReturnStatus::kAdvanced:
+ Document doc = input.getDocument();
+ auto time = doc.getField(_sortExecutor->sortPattern()[0].fieldPath->fullPath());
+ uassert(6369909,
+ "$_internalBoundedSort only handles Date values",
+ time.getType() == Date);
+ _timeSorter->add(time.getDate(), doc);
+ continue;
+ }
+ }
+
+ if (_timeSorter->getState() == TimeSorter::State::kDone)
+ return GetNextResult::makeEOF();
+
+ return _timeSorter->next().second;
+ }
+
if (!_populated) {
const auto populationResult = populate();
if (populationResult.isPaused()) {
@@ -101,6 +143,25 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSort::clone() const {
void DocumentSourceSort::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+ if (_timeSorter) {
+ tassert(6369900,
+ "$_internalBoundedSort should not absorb a $limit",
+ !_sortExecutor->hasLimit());
+
+ // TODO SERVER-63637 Implement execution stats.
+
+ // {$_internalBoundedSort: {sortKey, bound}}
+ auto sortKey = _sortExecutor->sortPattern().serialize(
+ SortPattern::SortKeySerialization::kForPipelineSerialization);
+ array.push_back(Value{Document{{
+ {"$_internalBoundedSort"_sd,
+ Document{{
+ {"sortKey"_sd, std::move(sortKey)},
+ {"bound"_sd, durationCount<Seconds>(_timeSorter->makeBound.bound)},
+ }}},
+ }}});
+ return;
+ }
uint64_t limit = _sortExecutor->getLimit();
@@ -145,6 +206,11 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (_timeSorter) {
+ // Do not absorb a limit, or combine with other sort stages.
+ return std::next(itr);
+ }
+
auto stageItr = std::next(itr);
auto limit = extractLimitForPushdown(stageItr, container);
if (limit)
@@ -209,6 +275,33 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
return pSort;
}
+intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(6369905,
+ "the $_internalBoundedSort key specification must be an object",
+ elem.type() == Object);
+ BSONObj args = elem.embeddedObject();
+
+ BSONElement key = args["sortKey"];
+ uassert(6369904, "$_internalBoundedSort sortKey must be an object", key.type() == Object);
+ SortPattern pat{key.embeddedObject(), expCtx};
+ uassert(6369903, "$_internalBoundedSort doesn't support compound sort", pat.size() == 1);
+ uassert(6369902, "$_internalBoundedSort only handles ascending sort", pat[0].isAscending);
+ uassert(6369901,
+ "$_internalBoundedSort doesn't support an expression in the sortKey",
+ pat[0].expression == nullptr);
+ uassert(6369907,
+ "$_internalBoundedSort doesn't support dotted field names",
+ pat[0].fieldPath->getPathLength() == 1);
+
+ BSONElement bound = args["bound"];
+ int boundN = uassertStatusOK(bound.parseIntegerElementToNonNegativeInt());
+
+ auto ds = DocumentSourceSort::create(expCtx, pat);
+ ds->_timeSorter.reset(new TimeSorter{{}, BoundMaker{Seconds{boundN}}});
+ return ds;
+}
+
DocumentSource::GetNextResult DocumentSourceSort::populate() {
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
@@ -260,6 +353,11 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
}
boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() {
+ uassert(6369906,
+ "$_internalBoundedSort cannot be the first stage on the merger, because it requires "
+ "almost-sorted input, which the shardsPart of a pipeline can't provide",
+ !_timeSorter);
+
DistributedPlanLogic split;
split.shardsStage = this;
split.inputSortPattern = _sortExecutor->sortPattern()
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 1660c91b9f2..1043f9a5d9a 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -113,6 +113,12 @@ public:
}
/**
+ * Parse a stage that uses BoundedSorter.
+ */
+ static boost::intrusive_ptr<DocumentSourceSort> parseBoundedSort(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
* Returns the the limit, if a subsequent $limit stage has been coalesced with this $sort stage.
* Otherwise, returns boost::none.
*/
@@ -189,6 +195,21 @@ private:
boost::optional<SortExecutor<Document>> _sortExecutor;
boost::optional<SortKeyGenerator> _sortKeyGen;
+
+ struct BoundMaker {
+ Seconds bound;
+
+ Date_t operator()(Date_t key) {
+ return key - bound;
+ }
+ };
+ struct Comp {
+ int operator()(Date_t x, Date_t y) const {
+ return x.toMillisSinceEpoch() - y.toMillisSinceEpoch();
+ }
+ };
+ using TimeSorter = BoundedSorter<Date_t, Document, Comp, BoundMaker>;
+ std::unique_ptr<TimeSorter> _timeSorter;
};
} // namespace mongo
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 8a0c696adde..945b50e4d6a 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -35,12 +35,14 @@
#include <deque>
#include <fstream>
#include <memory>
+#include <queue>
#include <string>
#include <utility>
#include <vector>
#include "mongo/bson/util/builder.h"
#include "mongo/db/sorter/sorter_gen.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/bufreader.h"
/**
@@ -367,6 +369,124 @@ protected:
};
/**
+ * Sorts data that is already "almost sorted", meaning we can put a bound on how out-of-order
+ * any two input elements are. For example, maybe we are sorting by {time: 1} and we know that no
+ * two documents are more than an hour out of order. This means as soon as we see {time: t}, we know
+ * that any document earlier than {time: t - 1h} is safe to return.
+ *
+ * Note what's bounded is the difference in sort-key values, not the number of inversions.
+ * This means we don't know how much space we'll need.
+ *
+ * This is not a subclass of Sorter because the interface is different: Sorter has a strict
+ * separation between reading input and returning results, while BoundedSorter can alternate
+ * between the two.
+ *
+ * Comparator does a 3-way comparison between two Keys: comp(x, y) < 0 iff x < y.
+ *
+ * BoundMaker takes a Key from the input, and computes a bound. The bound is a Key that is
+ * less-or-equal to all future Keys that will be seen in the input.
+ */
+template <typename Key, typename Value, typename Comparator, typename BoundMaker>
+class BoundedSorter {
+public:
+ // 'Comparator' is a 3-way comparison, but std::priority_queue wants a '<' comparison.
+ // But also, std::priority_queue is a max-heap, and we want a min-heap.
+ // And also, 'Comparator' compares Keys, but std::priority_queue calls its comparator
+ // on whole elements.
+ struct Greater {
+ bool operator()(const std::pair<Key, Value>& p1, const std::pair<Key, Value>& p2) const {
+ return compare(p1.first, p2.first) > 0;
+ }
+ Comparator compare;
+ };
+
+ BoundedSorter(Comparator comp, BoundMaker makeBound)
+ : compare(comp), makeBound(makeBound), _heap(Greater{comp}) {}
+
+ // Feed one item of input to the sorter.
+ // Together, add() and done() represent the input stream.
+ void add(Key key, Value value) {
+ invariant(!_done);
+ // If a new value violates what we thought was our min bound, something has gone wrong.
+ if (checkInput && _min)
+ uassert(6369910, "BoundedSorter input is too out-of-order.", compare(*_min, key) <= 0);
+
+ // Each new item can potentially give us a tighter bound (a higher min).
+ Key newMin = makeBound(key);
+ if (!_min || compare(*_min, newMin) < 0)
+ _min = newMin;
+
+ _heap.emplace(std::move(key), std::move(value));
+ }
+
+ // Indicate that no more input will arrive.
+ // Together, add() and done() represent the input stream.
+ void done() {
+ invariant(!_done);
+ _done = true;
+ }
+
+ enum class State {
+ // An output document is not available yet, but this may change as more input arrives.
+ kWait,
+ // An output document is available now: you may call next() once.
+ kReady,
+ // All output has been returned.
+ kDone,
+ };
+ // Together, state() and next() represent the output stream.
+ // See BoundedSorter::State for the meaning of each case.
+ State getState() const {
+ if (_done) {
+ // No more input will arrive, so we're never in state kWait.
+ return _heap.empty() ? State::kDone : State::kReady;
+ } else {
+ if (_heap.empty())
+ return State::kWait;
+ dassert(_min);
+
+ // _heap.top() is the min of _heap, but we also need to consider whether a smaller input
+ // will arrive later. So _heap.top() is safe to return only if heap.top() < _min.
+ if (compare(_heap.top().first, *_min) < 0)
+ return State::kReady;
+
+ // A later call to add() may improve _min. Or in the worst case, after done() is called
+ // we will return everything in _heap.
+ return State::kWait;
+ }
+ }
+
+ // Remove and return one item of output.
+ // Only valid to call when getState() == kReady.
+ // Together, state() and next() represent the output stream.
+ std::pair<Key, Value> next() {
+ dassert(getState() == State::kReady);
+ auto result = _heap.top();
+ _heap.pop();
+ return result;
+ }
+
+ size_t size() const {
+ return _heap.size();
+ }
+
+ // By default, uassert that the input meets our assumptions of being almost-sorted.
+ // But if _checkInput is false, don't do that check.
+ // The output will be in the wrong order but otherwise it should work.
+ bool checkInput = true;
+
+ Comparator compare;
+ BoundMaker makeBound;
+
+private:
+ using KV = std::pair<Key, Value>;
+ std::priority_queue<KV, std::vector<KV>, Greater> _heap;
+
+ boost::optional<Key> _min;
+ bool _done = false;
+};
+
+/**
* Appends a pre-sorted range of data to a given file and hands back an Iterator over that file
* range.
*/
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index 5c3c1ed74a3..8fa5704239f 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -902,6 +902,149 @@ TEST_F(SorterMakeFromExistingRangesTest, RoundTrip) {
}
}
+class BoundedSorterTest : public unittest::Test {
+public:
+ using Key = int;
+ struct Doc {
+ Key time;
+
+ bool operator==(const Doc& other) {
+ return time == other.time;
+ }
+ };
+ struct Comparator {
+ int operator()(Key x, Key y) const {
+ return x - y;
+ }
+ };
+ struct BoundMaker {
+ Key operator()(Key k) const {
+ return k - 10;
+ }
+ };
+ using S = BoundedSorter<Key, Doc, Comparator, BoundMaker>;
+
+ /**
+ * Feed the input into the sorter one-by-one, taking any output as soon as it's available.
+ */
+ std::vector<Doc> sort(std::vector<Doc> input) {
+ std::vector<Doc> output;
+ auto push = [&](Doc doc) { output.push_back(doc); };
+
+ for (auto&& doc : input) {
+ sorter.add(doc.time, doc);
+ while (sorter.getState() == S::State::kReady)
+ push(sorter.next().second);
+ }
+ sorter.done();
+
+ while (sorter.getState() == S::State::kReady)
+ push(sorter.next().second);
+ ASSERT(sorter.getState() == S::State::kDone);
+
+ ASSERT(output.size() == input.size());
+ return output;
+ }
+
+ static void assertSorted(const std::vector<Doc>& docs) {
+ for (size_t i = 1; i < docs.size(); ++i) {
+ Doc prev = docs[i - 1];
+ Doc curr = docs[i];
+ ASSERT_LTE(prev.time, curr.time);
+ }
+ }
+
+ S sorter{{}, {}};
+};
+TEST_F(BoundedSorterTest, Empty) {
+ ASSERT(sorter.getState() == S::State::kWait);
+
+ sorter.done();
+ ASSERT(sorter.getState() == S::State::kDone);
+}
+TEST_F(BoundedSorterTest, Sorted) {
+ auto output = sort({
+ {0},
+ {3},
+ {10},
+ {11},
+ {12},
+ {13},
+ {14},
+ {15},
+ {16},
+ });
+ assertSorted(output);
+}
+
+TEST_F(BoundedSorterTest, SortedExceptOne) {
+ auto output = sort({
+ {0},
+ {3},
+ {10},
+ // Swap 11 and 12.
+ {12},
+ {11},
+ {13},
+ {14},
+ {15},
+ {16},
+ });
+ assertSorted(output);
+}
+
+TEST_F(BoundedSorterTest, AlmostSorted) {
+ auto output = sort({
+ // 0 and 11 cannot swap.
+ {0},
+ {11},
+ {13},
+ {10},
+ {12},
+ // 3 and 14 cannot swap.
+ {3},
+ {14},
+ {15},
+ {16},
+ });
+ assertSorted(output);
+}
+
+TEST_F(BoundedSorterTest, WrongInput) {
+ std::vector<Doc> input = {
+ {3},
+ {4},
+ {5},
+ {10},
+ {15},
+ // This 1 is too far out of order: it's more than 10 away from 15.
+ // So it will appear too late in the output.
+ // We will still be hanging on to anything in the range [5, inf).
+ // So we will have already returned 3, 4.
+ {1},
+ {16},
+ };
+
+ // Disable input order checking so we can see what happens.
+ sorter.checkInput = false;
+ auto output = sort(input);
+ ASSERT_EQ(output.size(), 7);
+
+ ASSERT_EQ(output[0].time, 3);
+ ASSERT_EQ(output[1].time, 4);
+ ASSERT_EQ(output[2].time, 1); // Out of order.
+ ASSERT_EQ(output[3].time, 5);
+ ASSERT_EQ(output[4].time, 10);
+ ASSERT_EQ(output[5].time, 15);
+ ASSERT_EQ(output[6].time, 16);
+
+ // Test that by default, bad input like this would be detected.
+ sorter = S{{}, {}};
+ ASSERT(sorter.checkInput);
+ ASSERT_THROWS_CODE(sort(input), DBException, 6369910);
+}
+
+
} // namespace
} // namespace sorter
} // namespace mongo