summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuoxin Xu <ruoxin.xu@mongodb.com>2021-03-09 15:55:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-17 23:47:47 +0000
commit65faf38974f63900b1dcdc6fd1994ce3255911d4 (patch)
treebc4527e60d6054279b2d4aa7035ec2b7585c0026
parent1640bbb7d487ef65722d98ac830090c82b7f7ee5 (diff)
downloadmongo-65faf38974f63900b1dcdc6fd1994ce3255911d4.tar.gz
SERVER-54240 Implement $covariance accumulator-only
-rw-r--r--jstests/aggregation/sources/setWindowFields/covariance.js107
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/accumulator.h41
-rw-r--r--src/mongo/db/pipeline/accumulator_covariance.cpp105
-rw-r--r--src/mongo/db/pipeline/accumulator_test.cpp141
5 files changed, 395 insertions, 0 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/covariance.js b/jstests/aggregation/sources/setWindowFields/covariance.js
new file mode 100644
index 00000000000..cc65ff90488
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/covariance.js
@@ -0,0 +1,107 @@
+/**
+ * Test that $covariance(Pop/Samp) works as a window function.
+ * Currently only tests accumulator-type window function.
+ */
+(function() {
+"use strict";
+
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1}))
+ .featureFlagWindowFunctions.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the window function feature flag is disabled");
+ return;
+}
+
+const coll = db[jsTestName()];
+coll.drop();
+
+const nonRemovableCovStage = {
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {
+ popCovariance:
+ {$covariancePop: ["$x", "$y"], window: {documents: ["unbounded", "current"]}},
+ sampCovariance:
+ {$covarianceSamp: ["$x", "$y"], window: {documents: ["unbounded", "current"]}},
+ }
+ },
+};
+
+// Basic tests.
+assert.commandWorked(coll.insert({_id: 1, x: 0, y: 0}));
+assert.commandWorked(coll.insert({_id: 2, x: 2, y: 2}));
+
+const result = coll.aggregate([nonRemovableCovStage]).toArray();
+assert.eq(result.length, 2);
+assert.eq(result[0].popCovariance.toFixed(2), 0.00);
+assert.eq(result[0].sampCovariance, null);
+assert.eq(result[1].popCovariance.toFixed(2), 1.00);
+assert.eq(result[1].sampCovariance.toFixed(2), 2.00);
+
+coll.drop();
+const nDocs = 10;
+for (let i = 1; i <= nDocs; i++) {
+ assert.commandWorked(coll.insert({
+ _id: i,
+ x: Math.random(),
+ y: Math.random(),
+ }));
+}
+
+// Caculate the running average of vector X and vector Y using $avg window function. The running
+// average of each document is the current average of 'X' and 'Y' in window [unbounded, current].
+// 'runningAvg(X/Y)' will be used to calculate covariance based on the offline algorithm -
+// Cov(x, y) = ( Σ( (xi - avg(x)) * (yi - avg(y)) ) / n )
+function calculateCovarianceOffline() {
+ let resultOffline =
+ coll.aggregate([
+ {
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {
+ runningAvgX:
+ {$avg: "$x", window: {documents: ["unbounded", "current"]}},
+ runningAvgY:
+ {$avg: "$y", window: {documents: ["unbounded", "current"]}},
+ }
+ },
+ },
+ ])
+ .toArray();
+
+ assert.eq(resultOffline.length, nDocs);
+ resultOffline[0].popCovariance = 0.0;
+ resultOffline[0].sampCovariance = null;
+
+ for (let i = 1; i < resultOffline.length; i++) {
+ let c_i = 0.0;
+ for (let j = 0; j <= i; j++) {
+ c_i += ((resultOffline[j].x - resultOffline[i].runningAvgX) *
+ (resultOffline[j].y - resultOffline[i].runningAvgY));
+ }
+ resultOffline[i].popCovariance = c_i / (i + 1);
+ resultOffline[i].sampCovariance = c_i / i;
+ }
+
+ return resultOffline;
+}
+
+// This function compares covariance calculated based on the offline and the online algorithm to
+// test the results are consistent.
+// Note that the server calculates covariance based on an online algorithm -
+// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online
+(function compareCovarianceOfflineAndOnline() {
+ const offlineRes = calculateCovarianceOffline();
+ const onlineRes = coll.aggregate([nonRemovableCovStage]).toArray();
+
+ assert.eq(offlineRes.length, onlineRes.length);
+ assert.eq(onlineRes.length, nDocs);
+ assert.eq(onlineRes[0].popCovariance, 0.0);
+ assert.eq(onlineRes[0].sampCovariance, null);
+ for (let i = 1; i < offlineRes.length; i++) {
+ assert.eq(offlineRes[i].popCovariance.toFixed(5), onlineRes[i].popCovariance.toFixed(5));
+ assert.eq(offlineRes[i].sampCovariance.toFixed(5), onlineRes[i].sampCovariance.toFixed(5));
+ }
+})();
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b73c1e533d5..bcec0c46033 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -118,6 +118,7 @@ env.Library(
'accumulation_statement.cpp',
'accumulator_add_to_set.cpp',
'accumulator_avg.cpp',
+ 'accumulator_covariance.cpp',
'accumulator_first.cpp',
'accumulator_js_reduce.cpp',
'accumulator_last.cpp',
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index 98ee003e129..5107445ed25 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -417,6 +417,47 @@ public:
static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
};
+class AccumulatorCovariance : public AccumulatorState {
+public:
+ AccumulatorCovariance(ExpressionContext* const expCtx, bool isSamp);
+
+ void processInternal(const Value& input, bool merging) final;
+ Value getValue(bool toBeMerged) final;
+ void reset() final;
+ const char* getOpName() const final {
+ return (_isSamp ? "$covarianceSamp" : "$covariancePop");
+ }
+
+ bool isAssociative() const final {
+ tasserted(5424002,
+ str::stream() << "Invalid call to isAssociative in accumulator " << getOpName());
+ }
+ bool isCommutative() const final {
+ tasserted(5424003,
+ str::stream() << "Invalid call to isCommutative in accumulator " << getOpName());
+ }
+
+private:
+ bool _isSamp;
+ long long _count = 0;
+ double _meanX = 0, _meanY = 0;
+ double _cXY = 0;
+};
+
+class AccumulatorCovarianceSamp final : public AccumulatorCovariance {
+public:
+ explicit AccumulatorCovarianceSamp(ExpressionContext* const expCtx)
+ : AccumulatorCovariance(expCtx, true) {}
+ static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
+};
+
+class AccumulatorCovariancePop final : public AccumulatorCovariance {
+public:
+ explicit AccumulatorCovariancePop(ExpressionContext* const expCtx)
+ : AccumulatorCovariance(expCtx, false) {}
+ static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
+};
+
class AccumulatorMergeObjects : public AccumulatorState {
public:
AccumulatorMergeObjects(ExpressionContext* const expCtx);
diff --git a/src/mongo/db/pipeline/accumulator_covariance.cpp b/src/mongo/db/pipeline/accumulator_covariance.cpp
new file mode 100644
index 00000000000..77524035ab3
--- /dev/null
+++ b/src/mongo/db/pipeline/accumulator_covariance.cpp
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <cmath>
+
+#include "mongo/db/pipeline/accumulator.h"
+
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/window_function_expression.h"
+
+namespace mongo {
+
+REGISTER_NON_REMOVABLE_WINDOW_FUNCTION(
+ covarianceSamp, window_function::ExpressionFromAccumulator<AccumulatorCovarianceSamp>::parse);
+
+REGISTER_NON_REMOVABLE_WINDOW_FUNCTION(
+ covariancePop, window_function::ExpressionFromAccumulator<AccumulatorCovariancePop>::parse);
+
+void AccumulatorCovariance::processInternal(const Value& input, bool merging) {
+ tassert(5424000, "$covariance can't be merged", !merging);
+
+ // Currently we only support array with 2 numeric values as input value. Other types of input or
+ // non-numeric arrays have no impact on covariance.
+ if (!input.isArray())
+ return;
+ const auto& arr = input.getArray();
+ if (arr.size() != 2)
+ return;
+ if (!arr[0].numeric() || !arr[1].numeric())
+ return;
+
+ // This is an implementation of the following algorithm:
+ // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online
+ double x = arr[0].coerceToDouble();
+ double y = arr[1].coerceToDouble();
+ double dx = x - _meanX;
+
+ _count++;
+ _meanX += (dx / _count);
+ _meanY += (y - _meanY) / _count;
+ _cXY += dx * (y - _meanY);
+}
+
+AccumulatorCovariance::AccumulatorCovariance(ExpressionContext* const expCtx, bool isSamp)
+ : AccumulatorState(expCtx), _isSamp(isSamp) {
+ _memUsageBytes = sizeof(*this);
+}
+
+void AccumulatorCovariance::reset() {
+ _count = 0;
+ _meanX = 0;
+ _meanY = 0;
+ _cXY = 0; // 0 only makes sense if "_count" > 0.
+}
+
+Value AccumulatorCovariance::getValue(bool toBeMerged) {
+ const double adjustedCount = (_isSamp ? _count - 1 : _count);
+
+ if (adjustedCount <= 0)
+ return Value(BSONNULL); // Covariance not well defined in this case.
+
+ return Value(_cXY / adjustedCount);
+}
+
+boost::intrusive_ptr<AccumulatorState> AccumulatorCovarianceSamp::create(
+ ExpressionContext* const expCtx) {
+ return new AccumulatorCovarianceSamp(expCtx);
+}
+
+boost::intrusive_ptr<AccumulatorState> AccumulatorCovariancePop::create(
+ ExpressionContext* const expCtx) {
+ return new AccumulatorCovariancePop(expCtx);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp
index 9a08f5984d2..fe9fa550b6f 100644
--- a/src/mongo/db/pipeline/accumulator_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
+#include <cmath>
#include <memory>
#include "mongo/db/exec/document_value/document.h"
@@ -431,6 +432,146 @@ TEST(Accumulators, PushRespectsMaxMemoryConstraint) {
ErrorCodes::ExceededMemoryLimit);
}
+/* ------------------------- AccumulatorCorvariance(Samp/Pop) -------------------------- */
+
+// Calculate covariance using the offline algorithm.
+double offlineCovariance(const std::vector<Value>& input, bool isSamp) {
+ // Edge cases return 0 though 'input' should not be empty. Empty input is tested elsewhere.
+ if (input.size() <= 1)
+ return 0;
+
+ double adjustedN = isSamp ? input.size() - 1 : input.size();
+ double meanX = 0;
+ double meanY = 0;
+ double cXY = 0;
+
+ for (auto&& value : input) {
+ meanX += value.getArray()[0].coerceToDouble();
+ meanY += value.getArray()[1].coerceToDouble();
+ }
+ meanX /= input.size();
+ meanY /= input.size();
+
+ for (auto&& value : input) {
+ cXY += (value.getArray()[0].coerceToDouble() - meanX) *
+ (value.getArray()[1].coerceToDouble() - meanY);
+ }
+
+ return cXY / adjustedN;
+}
+
+// Test the accumulator-output covariance (using an online algorithm:
+// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online) is equal to the
+// covariance calculated based on the offline algorithm (cov(x,y) = Σ((xi-avg(x))*(yi-avg(y)))/n)).
+// If 'result' is given, the covariance should also be tested against the given result.
+template <typename AccName>
+static void assertCovariance(ExpressionContext* const expCtx,
+ const std::vector<Value>& input,
+ boost::optional<double> result = boost::none) {
+ auto accum = AccName::create(expCtx);
+ for (auto&& val : input) {
+ accum->process(val, false);
+ }
+ double onlineCov = accum->getValue(false).coerceToDouble();
+ double offlineCov =
+ offlineCovariance(input, std::is_same_v<AccName, AccumulatorCovarianceSamp>);
+
+ ASSERT_LTE(fabs(onlineCov - offlineCov), 1e-10);
+ if (result) {
+ ASSERT_LTE(fabs(onlineCov - *result), 1e-5);
+ }
+}
+
+TEST(Accumulators, CovarianceEdgeCases) {
+ auto expCtx = ExpressionContextForTest{};
+
+ // The sample covariance of variables of single value should be undefined.
+ const std::vector<Value> singlePoint = {
+ Value(std::vector<Value>({Value(0), Value(1)})),
+ };
+
+ const std::vector<Value> nanPoints = {
+ Value(std::vector<Value>({Value(numeric_limits<double>::quiet_NaN()),
+ Value(numeric_limits<double>::quiet_NaN())})),
+ Value(std::vector<Value>({Value(numeric_limits<double>::quiet_NaN()),
+ Value(numeric_limits<double>::quiet_NaN())})),
+ };
+
+ assertExpectedResults<AccumulatorCovariancePop>(
+ &expCtx,
+ {
+ {{}, Value(BSONNULL)},
+ {singlePoint, Value(0.0)},
+ {nanPoints, Value(numeric_limits<double>::quiet_NaN())},
+ },
+ true /* Covariance accumulator can't be merged */);
+
+ assertExpectedResults<AccumulatorCovarianceSamp>(
+ &expCtx,
+ {
+ {{}, Value(BSONNULL)},
+ {singlePoint, Value(BSONNULL)},
+ {nanPoints, Value(numeric_limits<double>::quiet_NaN())},
+ },
+ true /* Covariance accumulator can't be merged */);
+}
+
+TEST(Accumulators, PopulationCovariance) {
+ auto expCtx = ExpressionContextForTest{};
+
+ // Some doubles as input.
+ const std::vector<Value> multiplePoints = {
+ Value(std::vector<Value>({Value(0), Value(1.5)})),
+ Value(std::vector<Value>({Value(1.4), Value(2.5)})),
+ Value(std::vector<Value>({Value(4.7), Value(3.6)})),
+ };
+
+ // Test both offline and online corvariance algorithm with a given result.
+ assertCovariance<AccumulatorCovariancePop>(&expCtx, multiplePoints, 1.655556);
+}
+
+TEST(Accumulators, SampleCovariance) {
+ auto expCtx = ExpressionContextForTest{};
+
+ // Some doubles as input.
+ std::vector<Value> multiplePoints = {
+ Value(std::vector<Value>({Value(0), Value(1.5)})),
+ Value(std::vector<Value>({Value(1.4), Value(2.5)})),
+ Value(std::vector<Value>({Value(4.7), Value(3.6)})),
+ };
+
+ // Test both offline and online corvariance algorithm with a given result.
+ assertCovariance<AccumulatorCovarianceSamp>(&expCtx, multiplePoints, 2.483334);
+}
+
+std::vector<Value> generateRandomVariables() {
+ auto seed = Date_t::now().asInt64();
+ LOGV2(5424001, "Generated new seed is {seed}", "seed"_attr = seed);
+
+ std::vector<Value> output;
+ PseudoRandom prng(seed);
+ const int variableSize = prng.nextInt32(1000) + 2;
+
+ for (int i = 0; i < variableSize; i++) {
+ std::vector<Value> newXY;
+ newXY.push_back(Value(prng.nextCanonicalDouble()));
+ newXY.push_back(Value(prng.nextCanonicalDouble()));
+ output.push_back(Value(newXY));
+ }
+
+ return output;
+}
+
+TEST(Accumulators, CovarianceWithRandomVariables) {
+ auto expCtx = ExpressionContextForTest{};
+
+ // Some randomly generated variables as input.
+ std::vector<Value> randomVariables = generateRandomVariables();
+
+ assertCovariance<AccumulatorCovariancePop>(&expCtx, randomVariables, boost::none);
+ assertCovariance<AccumulatorCovarianceSamp>(&expCtx, randomVariables, boost::none);
+}
+
/* ------------------------- AccumulatorMergeObjects -------------------------- */
TEST(AccumulatorMergeObjects, MergingZeroObjectsShouldReturnEmptyDocument) {