summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2021-11-03 01:05:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-03 01:29:35 +0000
commit758a30de51c4fd33488249079db18c72928901a4 (patch)
tree96709885eac94680e784575db830f963a57f0612
parent2eaa640caa44332a0cf63b3e69b62a88f1d9c0cd (diff)
downloadmongo-758a30de51c4fd33488249079db18c72928901a4.tar.gz
SERVER-60501 Add $locf accumulator
-rw-r--r--jstests/aggregation/sources/setWindowFields/locf.js134
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/accumulator_for_window_functions.h19
-rw-r--r--src/mongo/db/pipeline/accumulator_locf.cpp74
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_expression.h63
5 files changed, 291 insertions, 0 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/locf.js b/jstests/aggregation/sources/setWindowFields/locf.js
new file mode 100644
index 00000000000..dfce32fae9d
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/locf.js
@@ -0,0 +1,134 @@
+/**
+ * Test that $locf works as a window function.
+ * @tags: [
+ * # Needed as $fill and $locf are 5.2 features.
+ * requires_fcv_52,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/window_function_helpers.js");
+load("jstests/aggregation/extras/utils.js"); // For arrayEq.
+load("jstests/libs/feature_flag_util.js"); // For isEnabled.
+
+if (!FeatureFlagUtil.isEnabled(db, "Fill")) {
+ jsTestLog("Skipping as featureFlagFill is not enabled");
+ return;
+}
+
+const coll = db[jsTestName()];
+coll.drop();
+
+// Test that $locf doesn't parse with a window.
+assert.commandFailedWithCode(coll.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {val: {$locf: {}, window: []}},
+ }
+ }],
+ cursor: {}
+}),
+ ErrorCodes.FailedToParse);
+
+// Create some documents.
+let collection = [
+ {_id: 0, val: null},
+ {_id: 1, val: 0},
+ {_id: 2, val: 2},
+ {_id: 3, val: null},
+ {_id: 4},
+ {_id: 5, val: "str"},
+ {_id: 6, val: null},
+ {_id: 7, rand: "rand"},
+];
+assert.commandWorked(coll.insert(collection));
+
+let result = coll.aggregate([{
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {val: {$locf: "$val"}},
+ }
+ }])
+ .toArray();
+
+let expected = [
+ {_id: 0, val: null},
+ {_id: 1, val: 0},
+ {_id: 2, val: 2},
+ {_id: 3, val: 2},
+ {_id: 4, val: 2},
+ {_id: 5, val: "str"},
+ {_id: 6, val: "str"},
+ {_id: 7, rand: "rand", val: "str"},
+];
+assertArrayEq(result, expected);
+
+// Partitions don't mix values
+collection = [
+ {_id: 1, val: 0, part: 1},
+ {_id: 2, val: 2, part: 2},
+ {_id: 3, val: null, part: 1},
+ {_id: 4, val: null, part: 2},
+];
+coll.drop();
+assert.commandWorked(coll.insert(collection));
+
+result = coll.aggregate([{
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {val: {$locf: "$val"}},
+ partitionBy: "$part",
+ }
+ }])
+ .toArray();
+
+expected = [
+ {_id: 1, val: 0, part: 1},
+ {_id: 2, val: 2, part: 2},
+ {_id: 3, val: 0, part: 1},
+ {_id: 4, val: 2, part: 2},
+];
+assertArrayEq(result, expected);
+
+// Values stay missing if all values are missing.
+collection = [
+ {_id: 1},
+ {_id: 2},
+ {_id: 3, val: null},
+ {_id: 4, val: null},
+];
+coll.drop();
+assert.commandWorked(coll.insert(collection));
+
+result = coll.aggregate([{
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {val: {$locf: "$val"}},
+ }
+ }])
+ .toArray();
+
+assertArrayEq(result, collection);
+
+collection = [
+ {_id: 1, val: null},
+ {_id: 2},
+ {_id: 3, val: null},
+ {_id: 4},
+];
+coll.drop();
+assert.commandWorked(coll.insert(collection));
+
+result = coll.aggregate([{
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {val: {$locf: "$val"}},
+ }
+ }])
+ .toArray();
+
+assertArrayEq(result, collection);
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 35ed9b974dc..b5c7309ea06 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -151,6 +151,7 @@ env.Library(
'accumulator_integral.cpp',
'accumulator_js_reduce.cpp',
'accumulator_last.cpp',
+ 'accumulator_locf.cpp',
'accumulator_merge_objects.cpp',
'accumulator_min_max.cpp',
'accumulator_multi.cpp',
diff --git a/src/mongo/db/pipeline/accumulator_for_window_functions.h b/src/mongo/db/pipeline/accumulator_for_window_functions.h
index 77f42695935..2ac3edbdca4 100644
--- a/src/mongo/db/pipeline/accumulator_for_window_functions.h
+++ b/src/mongo/db/pipeline/accumulator_for_window_functions.h
@@ -170,4 +170,23 @@ private:
WindowFunctionIntegral _integralWF;
};
+class AccumulatorLocf : public AccumulatorForWindowFunctions {
+public:
+ static constexpr auto kName = "$locf"_sd;
+
+ const char* getOpName() const final {
+ return kName.rawData();
+ }
+
+ explicit AccumulatorLocf(ExpressionContext* expCtx);
+
+ void processInternal(const Value& input, bool merging) final;
+ Value getValue(bool toBeMerged) final;
+ void reset() final;
+ static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* expCtx);
+
+private:
+ Value _lastNonNull;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_locf.cpp b/src/mongo/db/pipeline/accumulator_locf.cpp
new file mode 100644
index 00000000000..27d04923a59
--- /dev/null
+++ b/src/mongo/db/pipeline/accumulator_locf.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/accumulator_for_window_functions.h"
+
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/window_function_expression.h"
+
+namespace mongo {
+
+REGISTER_WINDOW_FUNCTION_CONDITIONALLY(
+ locf,
+ mongo::window_function::ExpressionFromWindowlessAccumulator<AccumulatorLocf>::parse,
+ multiversion::FeatureCompatibilityVersion::kVersion_5_1,
+ feature_flags::gFlagFill.isEnabledAndIgnoreFCV());
+
+AccumulatorLocf::AccumulatorLocf(ExpressionContext* const expCtx)
+ : AccumulatorForWindowFunctions(expCtx) {
+ _memUsageBytes = sizeof(*this) + _lastNonNull.getApproximateSize();
+}
+
+void AccumulatorLocf::processInternal(const Value& input, bool merging) {
+ tassert(6050100, "$locf can't be merged", !merging);
+
+ if (!input.nullish()) {
+ _lastNonNull = input;
+ _memUsageBytes = sizeof(*this) + _lastNonNull.getApproximateSize();
+ }
+}
+
+Value AccumulatorLocf::getValue(bool toBeMerged) {
+ tassert(6050102, "$locf can't be merged", !toBeMerged);
+ return _lastNonNull;
+}
+
+void AccumulatorLocf::reset() {
+ _lastNonNull = Value();
+ _memUsageBytes = sizeof(*this) + _lastNonNull.getApproximateSize();
+}
+
+boost::intrusive_ptr<AccumulatorState> AccumulatorLocf::create(ExpressionContext* const expCtx) {
+ return new AccumulatorLocf(expCtx);
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_expression.h b/src/mongo/db/pipeline/window_function/window_function_expression.h
index f6b2d737f1d..7827e6c601c 100644
--- a/src/mongo/db/pipeline/window_function/window_function_expression.h
+++ b/src/mongo/db/pipeline/window_function/window_function_expression.h
@@ -254,6 +254,69 @@ public:
: Expression(expCtx, std::move(accumulatorName), std::move(input), std::move(bounds)) {}
};
+/**
+ * Similar to ExpressionFromAccumulator except only an expression argument is allowed.
+ */
+template <typename NonRemovableType>
+class ExpressionFromWindowlessAccumulator : public Expression {
+public:
+ ExpressionFromWindowlessAccumulator(ExpressionContext* expCtx,
+ std::string accumulatorName,
+ boost::intrusive_ptr<::mongo::Expression> input,
+ WindowBounds bounds)
+ : Expression(expCtx, std::move(accumulatorName), std::move(input), std::move(bounds)) {}
+ static boost::intrusive_ptr<Expression> parse(BSONObj obj,
+ const boost::optional<SortPattern>& sortBy,
+ ExpressionContext* expCtx) {
+ // 'obj' is something like '{$func: <expressionArg>}'
+ boost::optional<StringData> accumulatorName;
+ WindowBounds bounds = WindowBounds::defaultBounds();
+ boost::intrusive_ptr<::mongo::Expression> input;
+ bool windowFieldMissing = true;
+ for (const auto& arg : obj) {
+ auto argName = arg.fieldNameStringData();
+ if (argName == kWindowArg) {
+ windowFieldMissing = false;
+ } else if (isFunction(argName)) {
+ uassert(ErrorCodes::FailedToParse,
+ "Cannot specify two functions in window function spec",
+ !accumulatorName);
+ accumulatorName = argName;
+ input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState);
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Window function found an unknown argument: " << argName);
+ }
+ }
+
+ uassert(ErrorCodes::FailedToParse,
+ "Must specify a window function in output field",
+ accumulatorName);
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "'window' field is not allowed in " << accumulatorName,
+ windowFieldMissing);
+ return make_intrusive<ExpressionFromWindowlessAccumulator<NonRemovableType>>(
+ expCtx, accumulatorName->toString(), std::move(input), std::move(bounds));
+ }
+
+ boost::intrusive_ptr<AccumulatorState> buildAccumulatorOnly() const final {
+ return NonRemovableType::create(_expCtx);
+ }
+
+ std::unique_ptr<WindowFunctionState> buildRemovable() const final {
+ tasserted(6050101,
+ str::stream() << "Window function " << _accumulatorName
+ << " is not supported with a window");
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ MutableDocument args;
+ args.addField(_accumulatorName, Value(_input->serialize(static_cast<bool>(explain))));
+ return args.freezeToValue();
+ }
+};
+
template <typename NonRemovableType, typename RemovableType>
class ExpressionRemovable : public Expression {
public: