diff options
author | Ruoxin Xu <ruoxin.xu@mongodb.com> | 2021-03-09 15:55:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-17 23:47:47 +0000 |
commit | 65faf38974f63900b1dcdc6fd1994ce3255911d4 (patch) | |
tree | bc4527e60d6054279b2d4aa7035ec2b7585c0026 /src/mongo/db | |
parent | 1640bbb7d487ef65722d98ac830090c82b7f7ee5 (diff) | |
download | mongo-65faf38974f63900b1dcdc6fd1994ce3255911d4.tar.gz |
SERVER-54240 Implement $covariance accumulator-only
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 41 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_covariance.cpp | 105 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_test.cpp | 141 |
4 files changed, 288 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index b73c1e533d5..bcec0c46033 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -118,6 +118,7 @@ env.Library( 'accumulation_statement.cpp', 'accumulator_add_to_set.cpp', 'accumulator_avg.cpp', + 'accumulator_covariance.cpp', 'accumulator_first.cpp', 'accumulator_js_reduce.cpp', 'accumulator_last.cpp', diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 98ee003e129..5107445ed25 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -417,6 +417,47 @@ public: static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); }; +class AccumulatorCovariance : public AccumulatorState { +public: + AccumulatorCovariance(ExpressionContext* const expCtx, bool isSamp); + + void processInternal(const Value& input, bool merging) final; + Value getValue(bool toBeMerged) final; + void reset() final; + const char* getOpName() const final { + return (_isSamp ? "$covarianceSamp" : "$covariancePop"); + } + + bool isAssociative() const final { + tasserted(5424002, + str::stream() << "Invalid call to isAssociative in accumulator " << getOpName()); + } + bool isCommutative() const final { + tasserted(5424003, + str::stream() << "Invalid call to isCommutative in accumulator " << getOpName()); + } + +private: + bool _isSamp; + long long _count = 0; + double _meanX = 0, _meanY = 0; + double _cXY = 0; +}; + +class AccumulatorCovarianceSamp final : public AccumulatorCovariance { +public: + explicit AccumulatorCovarianceSamp(ExpressionContext* const expCtx) + : AccumulatorCovariance(expCtx, true) {} + static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); +}; + +class AccumulatorCovariancePop final : public AccumulatorCovariance { +public: + explicit AccumulatorCovariancePop(ExpressionContext* const expCtx) + : AccumulatorCovariance(expCtx, false) {} + static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); +}; + class AccumulatorMergeObjects : public AccumulatorState { public: AccumulatorMergeObjects(ExpressionContext* const expCtx); diff --git a/src/mongo/db/pipeline/accumulator_covariance.cpp b/src/mongo/db/pipeline/accumulator_covariance.cpp new file mode 100644 index 00000000000..77524035ab3 --- /dev/null +++ b/src/mongo/db/pipeline/accumulator_covariance.cpp @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <cmath> + +#include "mongo/db/pipeline/accumulator.h" + +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/window_function_expression.h" + +namespace mongo { + +REGISTER_NON_REMOVABLE_WINDOW_FUNCTION( + covarianceSamp, window_function::ExpressionFromAccumulator<AccumulatorCovarianceSamp>::parse); + +REGISTER_NON_REMOVABLE_WINDOW_FUNCTION( + covariancePop, window_function::ExpressionFromAccumulator<AccumulatorCovariancePop>::parse); + +void AccumulatorCovariance::processInternal(const Value& input, bool merging) { + tassert(5424000, "$covariance can't be merged", !merging); + + // Currently we only support array with 2 numeric values as input value. Other types of input or + // non-numeric arrays have no impact on covariance. + if (!input.isArray()) + return; + const auto& arr = input.getArray(); + if (arr.size() != 2) + return; + if (!arr[0].numeric() || !arr[1].numeric()) + return; + + // This is an implementation of the following algorithm: + // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online + double x = arr[0].coerceToDouble(); + double y = arr[1].coerceToDouble(); + double dx = x - _meanX; + + _count++; + _meanX += (dx / _count); + _meanY += (y - _meanY) / _count; + _cXY += dx * (y - _meanY); +} + +AccumulatorCovariance::AccumulatorCovariance(ExpressionContext* const expCtx, bool isSamp) + : AccumulatorState(expCtx), _isSamp(isSamp) { + _memUsageBytes = sizeof(*this); +} + +void AccumulatorCovariance::reset() { + _count = 0; + _meanX = 0; + _meanY = 0; + _cXY = 0; // 0 only makes sense if "_count" > 0. +} + +Value AccumulatorCovariance::getValue(bool toBeMerged) { + const double adjustedCount = (_isSamp ? _count - 1 : _count); + + if (adjustedCount <= 0) + return Value(BSONNULL); // Covariance not well defined in this case. + + return Value(_cXY / adjustedCount); +} + +boost::intrusive_ptr<AccumulatorState> AccumulatorCovarianceSamp::create( + ExpressionContext* const expCtx) { + return new AccumulatorCovarianceSamp(expCtx); +} + +boost::intrusive_ptr<AccumulatorState> AccumulatorCovariancePop::create( + ExpressionContext* const expCtx) { + return new AccumulatorCovariancePop(expCtx); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 9a08f5984d2..fe9fa550b6f 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" +#include <cmath> #include <memory> #include "mongo/db/exec/document_value/document.h" @@ -431,6 +432,146 @@ TEST(Accumulators, PushRespectsMaxMemoryConstraint) { ErrorCodes::ExceededMemoryLimit); } +/* ------------------------- AccumulatorCorvariance(Samp/Pop) -------------------------- */ + +// Calculate covariance using the offline algorithm. +double offlineCovariance(const std::vector<Value>& input, bool isSamp) { + // Edge cases return 0 though 'input' should not be empty. Empty input is tested elsewhere. + if (input.size() <= 1) + return 0; + + double adjustedN = isSamp ? input.size() - 1 : input.size(); + double meanX = 0; + double meanY = 0; + double cXY = 0; + + for (auto&& value : input) { + meanX += value.getArray()[0].coerceToDouble(); + meanY += value.getArray()[1].coerceToDouble(); + } + meanX /= input.size(); + meanY /= input.size(); + + for (auto&& value : input) { + cXY += (value.getArray()[0].coerceToDouble() - meanX) * + (value.getArray()[1].coerceToDouble() - meanY); + } + + return cXY / adjustedN; +} + +// Test the accumulator-output covariance (using an online algorithm: +// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online) is equal to the +// covariance calculated based on the offline algorithm (cov(x,y) = Σ((xi-avg(x))*(yi-avg(y)))/n)). +// If 'result' is given, the covariance should also be tested against the given result. +template <typename AccName> +static void assertCovariance(ExpressionContext* const expCtx, + const std::vector<Value>& input, + boost::optional<double> result = boost::none) { + auto accum = AccName::create(expCtx); + for (auto&& val : input) { + accum->process(val, false); + } + double onlineCov = accum->getValue(false).coerceToDouble(); + double offlineCov = + offlineCovariance(input, std::is_same_v<AccName, AccumulatorCovarianceSamp>); + + ASSERT_LTE(fabs(onlineCov - offlineCov), 1e-10); + if (result) { + ASSERT_LTE(fabs(onlineCov - *result), 1e-5); + } +} + +TEST(Accumulators, CovarianceEdgeCases) { + auto expCtx = ExpressionContextForTest{}; + + // The sample covariance of variables of single value should be undefined. + const std::vector<Value> singlePoint = { + Value(std::vector<Value>({Value(0), Value(1)})), + }; + + const std::vector<Value> nanPoints = { + Value(std::vector<Value>({Value(numeric_limits<double>::quiet_NaN()), + Value(numeric_limits<double>::quiet_NaN())})), + Value(std::vector<Value>({Value(numeric_limits<double>::quiet_NaN()), + Value(numeric_limits<double>::quiet_NaN())})), + }; + + assertExpectedResults<AccumulatorCovariancePop>( + &expCtx, + { + {{}, Value(BSONNULL)}, + {singlePoint, Value(0.0)}, + {nanPoints, Value(numeric_limits<double>::quiet_NaN())}, + }, + true /* Covariance accumulator can't be merged */); + + assertExpectedResults<AccumulatorCovarianceSamp>( + &expCtx, + { + {{}, Value(BSONNULL)}, + {singlePoint, Value(BSONNULL)}, + {nanPoints, Value(numeric_limits<double>::quiet_NaN())}, + }, + true /* Covariance accumulator can't be merged */); +} + +TEST(Accumulators, PopulationCovariance) { + auto expCtx = ExpressionContextForTest{}; + + // Some doubles as input. + const std::vector<Value> multiplePoints = { + Value(std::vector<Value>({Value(0), Value(1.5)})), + Value(std::vector<Value>({Value(1.4), Value(2.5)})), + Value(std::vector<Value>({Value(4.7), Value(3.6)})), + }; + + // Test both offline and online corvariance algorithm with a given result. + assertCovariance<AccumulatorCovariancePop>(&expCtx, multiplePoints, 1.655556); +} + +TEST(Accumulators, SampleCovariance) { + auto expCtx = ExpressionContextForTest{}; + + // Some doubles as input. + std::vector<Value> multiplePoints = { + Value(std::vector<Value>({Value(0), Value(1.5)})), + Value(std::vector<Value>({Value(1.4), Value(2.5)})), + Value(std::vector<Value>({Value(4.7), Value(3.6)})), + }; + + // Test both offline and online corvariance algorithm with a given result. + assertCovariance<AccumulatorCovarianceSamp>(&expCtx, multiplePoints, 2.483334); +} + +std::vector<Value> generateRandomVariables() { + auto seed = Date_t::now().asInt64(); + LOGV2(5424001, "Generated new seed is {seed}", "seed"_attr = seed); + + std::vector<Value> output; + PseudoRandom prng(seed); + const int variableSize = prng.nextInt32(1000) + 2; + + for (int i = 0; i < variableSize; i++) { + std::vector<Value> newXY; + newXY.push_back(Value(prng.nextCanonicalDouble())); + newXY.push_back(Value(prng.nextCanonicalDouble())); + output.push_back(Value(newXY)); + } + + return output; +} + +TEST(Accumulators, CovarianceWithRandomVariables) { + auto expCtx = ExpressionContextForTest{}; + + // Some randomly generated variables as input. + std::vector<Value> randomVariables = generateRandomVariables(); + + assertCovariance<AccumulatorCovariancePop>(&expCtx, randomVariables, boost::none); + assertCovariance<AccumulatorCovarianceSamp>(&expCtx, randomVariables, boost::none); +} + /* ------------------------- AccumulatorMergeObjects -------------------------- */ TEST(AccumulatorMergeObjects, MergingZeroObjectsShouldReturnEmptyDocument) { |