summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2021-02-24 22:46:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-25 02:48:35 +0000
commit328891a7209346221ab1bfc3e5cb3c33a8d4f1cb (patch)
treec0ad15f530d1dc417678733547f2335122290123
parent364c92daaa4666e6428b6e0b805d71a01edf395c (diff)
downloadmongo-328891a7209346221ab1bfc3e5cb3c33a8d4f1cb.tar.gz
SERVER-54170 Add rank accumulators
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/accumulator.h52
-rw-r--r--src/mongo/db/pipeline/accumulator_rank.cpp125
-rw-r--r--src/mongo/db/pipeline/accumulator_test.cpp69
4 files changed, 244 insertions, 3 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 82279be16cd..806a5d01718 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -124,6 +124,7 @@ env.Library(
'accumulator_merge_objects.cpp',
'accumulator_min_max.cpp',
'accumulator_push.cpp',
+ 'accumulator_rank.cpp',
'accumulator_std_dev.cpp',
'accumulator_sum.cpp',
'window_function/window_bounds.cpp',
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index bf5f7e88b2c..496b5d07d38 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -224,6 +224,58 @@ private:
Value _last;
};
+class AccumulatorRankBase : public AccumulatorState {
+public:
+ explicit AccumulatorRankBase(ExpressionContext* const expCtx);
+ void reset();
+
+ bool isAssociative() const final {
+ tasserted(5417004,
+ str::stream() << "Invalid call to isAssociative in accumulator " << getOpName());
+ }
+ bool isCommutative() const final {
+ tasserted(5417000,
+ str::stream() << "Invalid call to isCommutative in accumulator " << getOpName());
+ }
+
+ Value getValue(bool toBeMerged) final {
+ return Value::createIntOrLong(_lastRank);
+ }
+
+protected:
+ long long _lastRank = 0;
+ boost::optional<Value> _lastInput = boost::none;
+};
+
+class AccumulatorRank : public AccumulatorRankBase {
+public:
+ explicit AccumulatorRank(ExpressionContext* const expCtx) : AccumulatorRankBase(expCtx) {}
+ void processInternal(const Value& input, bool merging) final;
+ const char* getOpName() const final;
+ static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
+ void reset() final;
+
+private:
+ size_t _numSameRank = 1;
+};
+
+class AccumulatorDocumentNumber : public AccumulatorRankBase {
+public:
+ explicit AccumulatorDocumentNumber(ExpressionContext* const expCtx)
+ : AccumulatorRankBase(expCtx) {}
+ void processInternal(const Value& input, bool merging) final;
+ const char* getOpName() const final;
+ static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
+};
+
+class AccumulatorDenseRank : public AccumulatorRankBase {
+public:
+ explicit AccumulatorDenseRank(ExpressionContext* const expCtx) : AccumulatorRankBase(expCtx) {}
+ void processInternal(const Value& input, bool merging) final;
+ const char* getOpName() const final;
+ static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
+};
+
class AccumulatorSum final : public AccumulatorState {
public:
explicit AccumulatorSum(ExpressionContext* const expCtx);
diff --git a/src/mongo/db/pipeline/accumulator_rank.cpp b/src/mongo/db/pipeline/accumulator_rank.cpp
new file mode 100644
index 00000000000..32496ea7802
--- /dev/null
+++ b/src/mongo/db/pipeline/accumulator_rank.cpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <cmath>
+#include <limits>
+
+#include "mongo/db/pipeline/accumulator.h"
+
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/window_function_expression.h"
+#include "mongo/util/summation.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+
+// These don't make sense as accumulators, so only register them as window functions.
+// TODO SERVER-53716 Enable rank function parsing.
+// REGISTER_WINDOW_FUNCTION(rank,
+// window_function::ExpressionFromAccumulator<AccumulatorRank>::parse);
+// REGISTER_WINDOW_FUNCTION(denseRank,
+// window_function::ExpressionFromAccumulator<AccumulatorDenseRank>::parse);
+// REGISTER_WINDOW_FUNCTION(
+// documentNumber,
+// window_function::ExpressionFromAccumulator<AccumulatorDocumentNumber>::parse);
+
+const char* AccumulatorRank::getOpName() const {
+ return "$rank";
+}
+
+void AccumulatorRank::processInternal(const Value& input, bool merging) {
+ tassert(5417001, "$rank can't be merged", !merging);
+ if (!_lastInput ||
+ getExpressionContext()->getValueComparator().compare(_lastInput.get(), input) != 0) {
+ _lastRank += _numSameRank;
+ _numSameRank = 1;
+ _lastInput = input;
+ _memUsageBytes = sizeof(*this) + _lastInput->getApproximateSize() - sizeof(Value);
+ } else {
+ ++_numSameRank;
+ }
+}
+
+const char* AccumulatorDocumentNumber::getOpName() const {
+ return "$documentNumber";
+}
+
+void AccumulatorDocumentNumber::processInternal(const Value& input, bool merging) {
+ tassert(5417002, "$documentNumber can't be merged", !merging);
+ // DocumentNumber doesn't need to keep track of what we just saw.
+ ++_lastRank;
+}
+
+const char* AccumulatorDenseRank::getOpName() const {
+ return "$denseRank";
+}
+
+void AccumulatorDenseRank::processInternal(const Value& input, bool merging) {
+ tassert(5417003, "$denseRank can't be merged", !merging);
+ if (!_lastInput ||
+ getExpressionContext()->getValueComparator().compare(_lastInput.get(), input) != 0) {
+ ++_lastRank;
+ _lastInput = input;
+ _memUsageBytes = sizeof(*this) + _lastInput->getApproximateSize() - sizeof(Value);
+ }
+}
+
+intrusive_ptr<AccumulatorState> AccumulatorRank::create(ExpressionContext* const expCtx) {
+ return new AccumulatorRank(expCtx);
+}
+
+intrusive_ptr<AccumulatorState> AccumulatorDenseRank::create(ExpressionContext* const expCtx) {
+ return new AccumulatorDenseRank(expCtx);
+}
+
+intrusive_ptr<AccumulatorState> AccumulatorDocumentNumber::create(ExpressionContext* const expCtx) {
+ return new AccumulatorDocumentNumber(expCtx);
+}
+
+AccumulatorRankBase::AccumulatorRankBase(ExpressionContext* const expCtx)
+ : AccumulatorState(expCtx) {
+ _memUsageBytes = sizeof(*this);
+}
+
+void AccumulatorRankBase::reset() {
+ _lastInput = boost::none;
+ _lastRank = 0;
+}
+
+void AccumulatorRank::reset() {
+ _lastInput = boost::none;
+ _numSameRank = 1;
+ _lastRank = 0;
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp
index dbae124306d..9a08f5984d2 100644
--- a/src/mongo/db/pipeline/accumulator_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_test.cpp
@@ -56,7 +56,8 @@ using std::string;
template <typename AccName>
static void assertExpectedResults(
ExpressionContext* const expCtx,
- std::initializer_list<std::pair<std::vector<Value>, Value>> operations) {
+ std::initializer_list<std::pair<std::vector<Value>, Value>> operations,
+ bool skipMerging = false) {
for (auto&& op : operations) {
try {
// Asserts that result equals expected result when not sharded.
@@ -71,7 +72,7 @@ static void assertExpectedResults(
}
// Asserts that result equals expected result when all input is on one shard.
- {
+ if (!skipMerging) {
auto accum = AccName::create(expCtx);
auto shard = AccName::create(expCtx);
for (auto&& val : op.first) {
@@ -84,7 +85,7 @@ static void assertExpectedResults(
}
// Asserts that result equals expected result when each input is on a separate shard.
- {
+ if (!skipMerging) {
auto accum = AccName::create(expCtx);
for (auto&& val : op.first) {
auto shard = AccName::create(expCtx);
@@ -336,6 +337,68 @@ TEST(Accumulators, Sum) {
{{Value(9), Value()}, Value(9)}});
}
+TEST(Accumulators, Rank) {
+ auto expCtx = ExpressionContextForTest{};
+ assertExpectedResults<AccumulatorRank>(
+ &expCtx,
+ {
+ // Document number is correct.
+ {{Value(0)}, Value(1)},
+ {{Value(0), Value(2)}, Value(2)},
+ {{Value(0), Value(2), Value(4)}, Value(3)},
+ // Ties don't increment
+ {{Value(1), Value(1)}, Value(1)},
+ // Ties skip next value correctly.
+ {{Value(1), Value(1), Value(3)}, Value(3)},
+ {{Value(1), Value(1), Value(1), Value(3)}, Value(4)},
+ {{Value(1), Value(1), Value(1), Value(3), Value(3), Value(7)}, Value(6)},
+ // Expected results with empty values.
+ {{Value{}}, Value(1)},
+ {{Value{}, Value{}}, Value(1)},
+
+ },
+ true /* rank can't be merged */);
+}
+
+TEST(Accumulators, DenseRank) {
+ auto expCtx = ExpressionContextForTest{};
+ assertExpectedResults<AccumulatorDenseRank>(
+ &expCtx,
+ {
+ // Document number is correct.
+ {{Value(0)}, Value(1)},
+ {{Value(0), Value(2)}, Value(2)},
+ {{Value(0), Value(2), Value(4)}, Value(3)},
+ // Ties don't increment
+ {{Value(1), Value(1)}, Value(1)},
+ // Ties don't skip values.
+ {{Value(1), Value(1), Value(3)}, Value(2)},
+ {{Value(1), Value(1), Value(1), Value(3)}, Value(2)},
+ {{Value(1), Value(1), Value(1), Value(3), Value(3), Value(7)}, Value(3)},
+
+ },
+ true /* denseRank can't be merged */);
+}
+
+TEST(Accumulators, DocumentNumberRank) {
+ auto expCtx = ExpressionContextForTest{};
+ assertExpectedResults<AccumulatorDocumentNumber>(
+ &expCtx,
+ {
+ // Document number is correct.
+ {{Value(0)}, Value(1)},
+ {{Value(0), Value(2)}, Value(2)},
+ {{Value(0), Value(2), Value(4)}, Value(3)},
+ // Ties increment
+ {{Value(1), Value(1)}, Value(2)},
+ {{Value(1), Value(1), Value(3)}, Value(3)},
+ {{Value(1), Value(1), Value(1), Value(3)}, Value(4)},
+ {{Value(1), Value(1), Value(1), Value(3), Value(3), Value(7)}, Value(6)},
+
+ },
+ true /* denseRank can't be merged */);
+}
+
TEST(Accumulators, AddToSetRespectsCollation) {
auto expCtx = ExpressionContextForTest{};
auto collator =