diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2021-02-24 22:46:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-25 02:48:35 +0000 |
commit | 328891a7209346221ab1bfc3e5cb3c33a8d4f1cb (patch) | |
tree | c0ad15f530d1dc417678733547f2335122290123 | |
parent | 364c92daaa4666e6428b6e0b805d71a01edf395c (diff) | |
download | mongo-328891a7209346221ab1bfc3e5cb3c33a8d4f1cb.tar.gz |
SERVER-54170 Add rank accumulators
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 52 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_rank.cpp | 125 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_test.cpp | 69 |
4 files changed, 244 insertions, 3 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 82279be16cd..806a5d01718 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -124,6 +124,7 @@ env.Library( 'accumulator_merge_objects.cpp', 'accumulator_min_max.cpp', 'accumulator_push.cpp', + 'accumulator_rank.cpp', 'accumulator_std_dev.cpp', 'accumulator_sum.cpp', 'window_function/window_bounds.cpp', diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index bf5f7e88b2c..496b5d07d38 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -224,6 +224,58 @@ private: Value _last; }; +class AccumulatorRankBase : public AccumulatorState { +public: + explicit AccumulatorRankBase(ExpressionContext* const expCtx); + void reset(); + + bool isAssociative() const final { + tasserted(5417004, + str::stream() << "Invalid call to isAssociative in accumulator " << getOpName()); + } + bool isCommutative() const final { + tasserted(5417000, + str::stream() << "Invalid call to isCommutative in accumulator " << getOpName()); + } + + Value getValue(bool toBeMerged) final { + return Value::createIntOrLong(_lastRank); + } + +protected: + long long _lastRank = 0; + boost::optional<Value> _lastInput = boost::none; +}; + +class AccumulatorRank : public AccumulatorRankBase { +public: + explicit AccumulatorRank(ExpressionContext* const expCtx) : AccumulatorRankBase(expCtx) {} + void processInternal(const Value& input, bool merging) final; + const char* getOpName() const final; + static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); + void reset() final; + +private: + size_t _numSameRank = 1; +}; + +class AccumulatorDocumentNumber : public AccumulatorRankBase { +public: + explicit AccumulatorDocumentNumber(ExpressionContext* const expCtx) + : AccumulatorRankBase(expCtx) {} + void processInternal(const Value& input, bool merging) final; + const char* getOpName() const final; + static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); +}; + +class AccumulatorDenseRank : public AccumulatorRankBase { +public: + explicit AccumulatorDenseRank(ExpressionContext* const expCtx) : AccumulatorRankBase(expCtx) {} + void processInternal(const Value& input, bool merging) final; + const char* getOpName() const final; + static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); +}; + class AccumulatorSum final : public AccumulatorState { public: explicit AccumulatorSum(ExpressionContext* const expCtx); diff --git a/src/mongo/db/pipeline/accumulator_rank.cpp b/src/mongo/db/pipeline/accumulator_rank.cpp new file mode 100644 index 00000000000..32496ea7802 --- /dev/null +++ b/src/mongo/db/pipeline/accumulator_rank.cpp @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <cmath> +#include <limits> + +#include "mongo/db/pipeline/accumulator.h" + +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/window_function_expression.h" +#include "mongo/util/summation.h" + +namespace mongo { + +using boost::intrusive_ptr; + +// These don't make sense as accumulators, so only register them as window functions. +// TODO SERVER-53716 Enable rank function parsing. +// REGISTER_WINDOW_FUNCTION(rank, +// window_function::ExpressionFromAccumulator<AccumulatorRank>::parse); +// REGISTER_WINDOW_FUNCTION(denseRank, +// window_function::ExpressionFromAccumulator<AccumulatorDenseRank>::parse); +// REGISTER_WINDOW_FUNCTION( +// documentNumber, +// window_function::ExpressionFromAccumulator<AccumulatorDocumentNumber>::parse); + +const char* AccumulatorRank::getOpName() const { + return "$rank"; +} + +void AccumulatorRank::processInternal(const Value& input, bool merging) { + tassert(5417001, "$rank can't be merged", !merging); + if (!_lastInput || + getExpressionContext()->getValueComparator().compare(_lastInput.get(), input) != 0) { + _lastRank += _numSameRank; + _numSameRank = 1; + _lastInput = input; + _memUsageBytes = sizeof(*this) + _lastInput->getApproximateSize() - sizeof(Value); + } else { + ++_numSameRank; + } +} + +const char* AccumulatorDocumentNumber::getOpName() const { + return "$documentNumber"; +} + +void AccumulatorDocumentNumber::processInternal(const Value& input, bool merging) { + tassert(5417002, "$documentNumber can't be merged", !merging); + // DocumentNumber doesn't need to keep track of what we just saw. + ++_lastRank; +} + +const char* AccumulatorDenseRank::getOpName() const { + return "$denseRank"; +} + +void AccumulatorDenseRank::processInternal(const Value& input, bool merging) { + tassert(5417003, "$denseRank can't be merged", !merging); + if (!_lastInput || + getExpressionContext()->getValueComparator().compare(_lastInput.get(), input) != 0) { + ++_lastRank; + _lastInput = input; + _memUsageBytes = sizeof(*this) + _lastInput->getApproximateSize() - sizeof(Value); + } +} + +intrusive_ptr<AccumulatorState> AccumulatorRank::create(ExpressionContext* const expCtx) { + return new AccumulatorRank(expCtx); +} + +intrusive_ptr<AccumulatorState> AccumulatorDenseRank::create(ExpressionContext* const expCtx) { + return new AccumulatorDenseRank(expCtx); +} + +intrusive_ptr<AccumulatorState> AccumulatorDocumentNumber::create(ExpressionContext* const expCtx) { + return new AccumulatorDocumentNumber(expCtx); +} + +AccumulatorRankBase::AccumulatorRankBase(ExpressionContext* const expCtx) + : AccumulatorState(expCtx) { + _memUsageBytes = sizeof(*this); +} + +void AccumulatorRankBase::reset() { + _lastInput = boost::none; + _lastRank = 0; +} + +void AccumulatorRank::reset() { + _lastInput = boost::none; + _numSameRank = 1; + _lastRank = 0; +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index dbae124306d..9a08f5984d2 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -56,7 +56,8 @@ using std::string; template <typename AccName> static void assertExpectedResults( ExpressionContext* const expCtx, - std::initializer_list<std::pair<std::vector<Value>, Value>> operations) { + std::initializer_list<std::pair<std::vector<Value>, Value>> operations, + bool skipMerging = false) { for (auto&& op : operations) { try { // Asserts that result equals expected result when not sharded. @@ -71,7 +72,7 @@ static void assertExpectedResults( } // Asserts that result equals expected result when all input is on one shard. - { + if (!skipMerging) { auto accum = AccName::create(expCtx); auto shard = AccName::create(expCtx); for (auto&& val : op.first) { @@ -84,7 +85,7 @@ static void assertExpectedResults( } // Asserts that result equals expected result when each input is on a separate shard. - { + if (!skipMerging) { auto accum = AccName::create(expCtx); for (auto&& val : op.first) { auto shard = AccName::create(expCtx); @@ -336,6 +337,68 @@ TEST(Accumulators, Sum) { {{Value(9), Value()}, Value(9)}}); } +TEST(Accumulators, Rank) { + auto expCtx = ExpressionContextForTest{}; + assertExpectedResults<AccumulatorRank>( + &expCtx, + { + // Document number is correct. + {{Value(0)}, Value(1)}, + {{Value(0), Value(2)}, Value(2)}, + {{Value(0), Value(2), Value(4)}, Value(3)}, + // Ties don't increment + {{Value(1), Value(1)}, Value(1)}, + // Ties skip next value correctly. + {{Value(1), Value(1), Value(3)}, Value(3)}, + {{Value(1), Value(1), Value(1), Value(3)}, Value(4)}, + {{Value(1), Value(1), Value(1), Value(3), Value(3), Value(7)}, Value(6)}, + // Expected results with empty values. + {{Value{}}, Value(1)}, + {{Value{}, Value{}}, Value(1)}, + + }, + true /* rank can't be merged */); +} + +TEST(Accumulators, DenseRank) { + auto expCtx = ExpressionContextForTest{}; + assertExpectedResults<AccumulatorDenseRank>( + &expCtx, + { + // Document number is correct. + {{Value(0)}, Value(1)}, + {{Value(0), Value(2)}, Value(2)}, + {{Value(0), Value(2), Value(4)}, Value(3)}, + // Ties don't increment + {{Value(1), Value(1)}, Value(1)}, + // Ties don't skip values. + {{Value(1), Value(1), Value(3)}, Value(2)}, + {{Value(1), Value(1), Value(1), Value(3)}, Value(2)}, + {{Value(1), Value(1), Value(1), Value(3), Value(3), Value(7)}, Value(3)}, + + }, + true /* denseRank can't be merged */); +} + +TEST(Accumulators, DocumentNumberRank) { + auto expCtx = ExpressionContextForTest{}; + assertExpectedResults<AccumulatorDocumentNumber>( + &expCtx, + { + // Document number is correct. + {{Value(0)}, Value(1)}, + {{Value(0), Value(2)}, Value(2)}, + {{Value(0), Value(2), Value(4)}, Value(3)}, + // Ties increment + {{Value(1), Value(1)}, Value(2)}, + {{Value(1), Value(1), Value(3)}, Value(3)}, + {{Value(1), Value(1), Value(1), Value(3)}, Value(4)}, + {{Value(1), Value(1), Value(1), Value(3), Value(3), Value(7)}, Value(6)}, + + }, + true /* denseRank can't be merged */); +} + TEST(Accumulators, AddToSetRespectsCollation) { auto expCtx = ExpressionContextForTest{}; auto collator = |