summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/accumulators/accumulator_js.js127
-rw-r--r--jstests/aggregation/accumulators/accumulator_js_size_limits.js87
-rw-r--r--jstests/aggregation/accumulators/internal_js_reduce.js2
-rw-r--r--jstests/sharding/javascript_heap_limit.js31
-rw-r--r--src/mongo/db/commands/mr_common.cpp16
-rw-r--r--src/mongo/db/pipeline/accumulation_statement.cpp9
-rw-r--r--src/mongo/db/pipeline/accumulation_statement.h117
-rw-r--r--src/mongo/db/pipeline/accumulator.h90
-rw-r--r--src/mongo/db/pipeline/accumulator_add_to_set.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_avg.cpp6
-rw-r--r--src/mongo/db/pipeline/accumulator_first.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.cpp247
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.h66
-rw-r--r--src/mongo/db/pipeline/accumulator_last.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_merge_objects.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_min_max.cpp6
-rw-r--r--src/mongo/db/pipeline/accumulator_push.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_std_dev.cpp8
-rw-r--r--src/mongo/db/pipeline/accumulator_sum.cpp6
-rw-r--r--src/mongo/db/pipeline/accumulator_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp51
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h3
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp45
-rw-r--r--src/mongo/db/pipeline/document_source_group.h2
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp20
-rw-r--r--src/mongo/db/pipeline/expression.h14
-rw-r--r--src/mongo/db/pipeline/expression_visitor.h2
27 files changed, 167 insertions, 814 deletions
diff --git a/jstests/aggregation/accumulators/accumulator_js.js b/jstests/aggregation/accumulators/accumulator_js.js
deleted file mode 100644
index 2942ef641dd..00000000000
--- a/jstests/aggregation/accumulators/accumulator_js.js
+++ /dev/null
@@ -1,127 +0,0 @@
-// Test the behavior of user-defined (Javascript) accumulators.
-(function() {
-"use strict";
-
-load('jstests/aggregation/extras/utils.js');
-
-db.accumulator_js.drop();
-
-for (const word of ["hello", "world", "world", "hello", "hi"]) {
- db.accumulator_js.insert({word: word, val: 1});
-}
-
-const command = {
- aggregate: 'accumulator_js',
- cursor: {},
- pipeline: [{
- $group: {
- _id: "$word",
- wordCount: {
- $accumulator: {
- init: function() {
- return 0;
- },
- accumulateArgs: ["$val"],
- accumulate: function(state, val) {
- return state + val;
- },
- merge: function(state1, state2) {
- return state1 + state2;
- },
- finalize: function(state) {
- return state;
- }
- }
- }
- }
- }],
-};
-
-const expectedResults = [
- {_id: "hello", wordCount: 2},
- {_id: "world", wordCount: 2},
- {_id: "hi", wordCount: 1},
-];
-
-let res = assert.commandWorked(db.runCommand(command));
-assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor);
-
-// Test that the functions can be passed as strings.
-{
- const accumulatorSpec = command.pipeline[0].$group.wordCount.$accumulator;
- accumulatorSpec.init = accumulatorSpec.init.toString();
- accumulatorSpec.accumulate = accumulatorSpec.accumulate.toString();
- accumulatorSpec.merge = accumulatorSpec.merge.toString();
- accumulatorSpec.finalize = accumulatorSpec.finalize.toString();
-}
-res = assert.commandWorked(db.runCommand(command));
-assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor);
-
-// Test that finalize is optional.
-delete command.pipeline[0].$group.wordCount.$accumulator.finalize;
-res = assert.commandWorked(db.runCommand(command));
-assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor);
-
-// Test a finalizer other than the identity function. Finalizers are useful when the intermediate
-// state needs to be a different format from the final result.
-res = assert.commandWorked(db.runCommand(Object.merge(command, {
- pipeline: [{
- $group: {
- _id: 1,
- avgWordLen: {
- $accumulator: {
- init: function() {
- return {count: 0, sum: 0};
- },
- accumulateArgs: [{$strLenCP: "$word"}],
- accumulate: function({count, sum}, wordLen) {
- return {count: count + 1, sum: sum + wordLen};
- },
- merge: function(s1, s2) {
- return {count: s1.count + s2.count, sum: s1.sum + s2.sum};
- },
- finalize: function({count, sum}) {
- return sum / count;
- },
- lang: 'js',
- }
- },
- }
- }],
-})));
-assert(resultsEq(res.cursor.firstBatch, [{_id: 1, avgWordLen: 22 / 5}]), res.cursor);
-
-// Test that a null word is considered a valid value.
-assert.commandWorked(db.accumulator_js.insert({word: null, val: 1}));
-expectedResults.push({_id: null, wordCount: 1});
-res = assert.commandWorked(db.runCommand(command));
-assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor);
-
-// Test that missing fields become JS null.
-// This is similar to how most other agg operators work.
-// TODO SERVER-45450 is this a problem for mapreduce?
-assert(db.accumulator_js.drop());
-assert.commandWorked(db.accumulator_js.insert({sentinel: 1}));
-command.pipeline = [{
- $group: {
- _id: 1,
- value: {
- $accumulator: {
- init: function() {
- return [];
- },
- accumulateArgs: ["$no_such_field"],
- accumulate: function(state, value) {
- return state.concat([value]);
- },
- merge: function(s1, s2) {
- return s1.concat(s2);
- },
- lang: 'js',
- }
- }
- }
-}];
-res = assert.commandWorked(db.runCommand(command));
-assert(resultsEq(res.cursor.firstBatch, [{_id: 1, value: [null]}]), res.cursor);
-})();
diff --git a/jstests/aggregation/accumulators/accumulator_js_size_limits.js b/jstests/aggregation/accumulators/accumulator_js_size_limits.js
deleted file mode 100644
index 80bbcc49e07..00000000000
--- a/jstests/aggregation/accumulators/accumulator_js_size_limits.js
+++ /dev/null
@@ -1,87 +0,0 @@
-// Test several different kinds of size limits on user-defined (Javascript) accumulators.
-(function() {
-"use strict";
-
-const coll = db.accumulator_js_size_limits;
-
-function runExample(groupKey, accumulatorSpec) {
- return coll.runCommand({
- aggregate: coll.getName(),
- cursor: {},
- pipeline: [{
- $group: {
- _id: groupKey,
- accumulatedField: {$accumulator: accumulatorSpec},
- }
- }]
- });
-}
-
-// Accumulator tries to create too long a String; it can't be serialized to BSON.
-coll.drop();
-assert.commandWorked(coll.insert({}));
-let res = runExample(1, {
- init: function() {
- return "a".repeat(20 * 1024 * 1024);
- },
- accumulate: function() {
- throw 'accumulate should not be called';
- },
- accumulateArgs: [],
- merge: function() {
- throw 'merge should not be called';
- },
- finalize: function() {
- throw 'finalize should not be called';
- },
- lang: 'js',
-});
-assert.commandFailedWithCode(res, [10334]);
-
-// Accumulator tries to return BSON larger than 16MB from JS.
-assert(coll.drop());
-assert.commandWorked(coll.insert({}));
-res = runExample(1, {
- init: function() {
- const str = "a".repeat(1 * 1024 * 1024);
- return Array.from({length: 20}, () => str);
- },
- accumulate: function() {
- throw 'accumulate should not be called';
- },
- accumulateArgs: [],
- merge: function() {
- throw 'merge should not be called';
- },
- finalize: function() {
- throw 'finalize should not be called';
- },
- lang: 'js',
-});
-assert.commandFailedWithCode(res, [17260]);
-
-// $group size limit exceeded, and cannot spill.
-assert(coll.drop());
-assert.commandWorked(coll.insert(Array.from({length: 200}, (_, i) => ({_id: i}))));
-// By grouping on _id, each group contains only 1 document. This means it creates many
-// AccumulatorState instances.
-res = runExample("$_id", {
- init: function() {
- // Each accumulator state is big enough to be expensive, but not big enough to hit the BSON
- // size limit.
- return "a".repeat(1 * 1024 * 1024);
- },
- accumulate: function(state) {
- return state;
- },
- accumulateArgs: [1],
- merge: function(state1, state2) {
- return state1;
- },
- finalize: function(state) {
- return state.length;
- },
- lang: 'js',
-});
-assert.commandFailedWithCode(res, [16945]);
-})();
diff --git a/jstests/aggregation/accumulators/internal_js_reduce.js b/jstests/aggregation/accumulators/internal_js_reduce.js
index b315adacf70..ae470b0950f 100644
--- a/jstests/aggregation/accumulators/internal_js_reduce.js
+++ b/jstests/aggregation/accumulators/internal_js_reduce.js
@@ -43,7 +43,7 @@ const expectedResults = [
];
let res = assert.commandWorked(db.runCommand(command));
-assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor);
+assert(resultsEq(res.cursor.firstBatch, expectedResults, res.cursor));
//
// Test that the reduce function also accepts a string argument.
diff --git a/jstests/sharding/javascript_heap_limit.js b/jstests/sharding/javascript_heap_limit.js
index f92907fb6b3..81cf5a8d8f2 100644
--- a/jstests/sharding/javascript_heap_limit.js
+++ b/jstests/sharding/javascript_heap_limit.js
@@ -49,7 +49,7 @@ const aggregateWithJSFunction = {
{$project: {y: {"$function": {args: [], body: allocateLargeString, lang: "js"}}}}
]
};
-const aggregateWithInternalJsReduce = {
+const aggregateWithJSAccumulator = {
aggregate: "coll",
cursor: {},
pipeline: [{
@@ -64,24 +64,6 @@ const aggregateWithInternalJsReduce = {
}
}]
};
-const aggregateWithUserDefinedAccumulator = {
- aggregate: "coll",
- cursor: {},
- pipeline: [{
- $group: {
- _id: "$x",
- value: {
- $accumulator: {
- init: allocateLargeString,
- accumulate: allocateLargeString,
- accumulateArgs: [{k: "$x", v: "$x"}],
- merge: allocateLargeString,
- lang: 'js',
- }
- }
- }
- }]
-};
const findWithJavaScriptFunction = {
find: "coll",
filter: {$expr: {"$function": {args: [], body: allocateLargeString, lang: "js"}}}
@@ -100,17 +82,14 @@ function runCommonTests(db) {
// All commands are expected to work with a sufficient JS heap size.
setHeapSizeLimitMB({db: db, queryLimit: sufficentHeapSizeMB, globalLimit: sufficentHeapSizeMB});
assert.commandWorked(db.runCommand(aggregateWithJSFunction));
- assert.commandWorked(db.runCommand(aggregateWithInternalJsReduce));
- assert.commandWorked(db.runCommand(aggregateWithUserDefinedAccumulator));
+ assert.commandWorked(db.runCommand(aggregateWithJSAccumulator));
// The aggregate command is expected to fail when the aggregation specific heap size limit is
// too low.
setHeapSizeLimitMB({db: db, queryLimit: tooSmallHeapSizeMB, globalLimit: sufficentHeapSizeMB});
assert.commandFailedWithCode(db.runCommand(aggregateWithJSFunction),
ErrorCodes.JSInterpreterFailure);
- assert.commandFailedWithCode(db.runCommand(aggregateWithInternalJsReduce),
- ErrorCodes.JSInterpreterFailure);
- assert.commandFailedWithCode(db.runCommand(aggregateWithUserDefinedAccumulator),
+ assert.commandFailedWithCode(db.runCommand(aggregateWithJSAccumulator),
ErrorCodes.JSInterpreterFailure);
// All commands are expected to fail when the global heap size limit is too low, regardless
@@ -118,9 +97,7 @@ function runCommonTests(db) {
setHeapSizeLimitMB({db: db, queryLimit: sufficentHeapSizeMB, globalLimit: tooSmallHeapSizeMB});
assert.commandFailedWithCode(db.runCommand(aggregateWithJSFunction),
ErrorCodes.JSInterpreterFailure);
- assert.commandFailedWithCode(db.runCommand(aggregateWithInternalJsReduce),
- ErrorCodes.JSInterpreterFailure);
- assert.commandFailedWithCode(db.runCommand(aggregateWithUserDefinedAccumulator),
+ assert.commandFailedWithCode(db.runCommand(aggregateWithJSAccumulator),
ErrorCodes.JSInterpreterFailure);
}
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index 52cd8bf0434..445621c1ad9 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -124,19 +124,15 @@ auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string co
}
auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
- auto initializer = ExpressionArray::create(expCtx, {});
- auto argument = ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState);
- auto reduceFactory = [expCtx, funcSource = std::move(code)]() {
+ auto accumulatorArgument =
+ ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState);
+ auto reduceFactory = [expCtx, funcSource = code]() {
return AccumulatorInternalJsReduce::create(expCtx, funcSource);
};
- AccumulationStatement jsReduce("value",
- AccumulationExpression(std::move(initializer),
- std::move(argument),
- std::move(reduceFactory)));
- auto groupKeyExpression =
- ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
+ AccumulationStatement jsReduce("value", std::move(accumulatorArgument), reduceFactory);
+ auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
return DocumentSourceGroup::create(expCtx,
- std::move(groupKeyExpression),
+ std::move(groupExpr),
make_vector<AccumulationStatement>(std::move(jsReduce)),
boost::none);
}
diff --git a/src/mongo/db/pipeline/accumulation_statement.cpp b/src/mongo/db/pipeline/accumulation_statement.cpp
index 72b2b80a6c6..18595477227 100644
--- a/src/mongo/db/pipeline/accumulation_statement.cpp
+++ b/src/mongo/db/pipeline/accumulation_statement.cpp
@@ -65,8 +65,8 @@ AccumulationStatement::Parser& AccumulationStatement::getParser(StringData name)
return it->second;
}
-boost::intrusive_ptr<AccumulatorState> AccumulationStatement::makeAccumulator() const {
- return expr.factory();
+boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator() const {
+ return _factory();
}
AccumulationStatement AccumulationStatement::parseAccumulationStatement(
@@ -98,10 +98,9 @@ AccumulationStatement AccumulationStatement::parseAccumulationStatement(
specElem.type() != BSONType::Array);
auto&& parser = AccumulationStatement::getParser(accName);
- auto [initializer, argument, factory] = parser(expCtx, specElem, vps);
+ auto [expression, factory] = parser(expCtx, specElem, vps);
- return AccumulationStatement(fieldName.toString(),
- AccumulationExpression(initializer, argument, factory));
+ return AccumulationStatement(fieldName.toString(), expression, factory);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulation_statement.h b/src/mongo/db/pipeline/accumulation_statement.h
index fff666141d3..91ac3a1aff3 100644
--- a/src/mongo/db/pipeline/accumulation_statement.h
+++ b/src/mongo/db/pipeline/accumulation_statement.h
@@ -38,8 +38,8 @@
namespace mongo {
/**
- * Registers an AccumulatorState to have the name 'key'. When an accumulator with name '$key' is
- * found during parsing, 'factory' will be called to construct the AccumulatorState.
+ * Registers an Accumulator to have the name 'key'. When an accumulator with name '$key' is found
+ * during parsing, 'factory' will be called to construct the Accumulator.
*
* As an example, if your accumulator looks like {"$foo": <args>}, with a factory method 'create',
* you would add this line:
@@ -52,102 +52,26 @@ namespace mongo {
}
/**
- * AccumulatorExpression represents the right-hand side of an AccumulationStatement. Note this is
- * different from Expression; they are different nonterminals in the grammar.
- *
- * For example, in
- * {$group: {
- * _id: 1,
- * count: {$sum: {$size: "$tags"}}
- * }}
- *
- * we would say:
- * The AccumulationStatement is count: {$sum: {$size: "$tags"}}
- * The AccumulationExpression is {$sum: {$size: "$tags"}}
- * The AccumulatorState::Factory is $sum
- * The argument Expression is {$size: "$tags"}
- * There is no initializer Expression.
- *
- * "$sum" corresponds to an AccumulatorState::Factory rather than AccumulatorState because
- * AccumulatorState is an execution concept, not an AST concept: each instance of AccumulatorState
- * contains intermediate values being accumulated.
- *
- * Like most accumulators, $sum does not require or accept an initializer Expression. At time of
- * writing, only user-defined accumulators accept an initializer.
- *
- * For example, in:
- * {$group: {
- * _id: {cc: "$country_code"},
- * top_stories: {$accumulator: {
- * init: function(cc) { ... },
- * initArgs: ["$cc"],
- * accumulate: function(state, title, upvotes) { ... },
- * accumulateArgs: ["$title", "$upvotes"],
- * merge: function(state1, state2) { ... },
- * lang: "js",
- * }}
- * }}
- *
- * we would say:
- * The AccumulationStatement is top_stories: {$accumulator: ... }
- * The AccumulationExpression is {$accumulator: ... }
- * The argument Expression is ["$cc"]
- * The initializer Expression is ["$title", "$upvotes"]
- * The AccumulatorState::Factory holds all the other arguments to $accumulator.
- *
- */
-struct AccumulationExpression {
- AccumulationExpression(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
- AccumulatorState::Factory factory)
- : initializer(initializer), argument(argument), factory(factory) {
- invariant(this->initializer);
- invariant(this->argument);
- }
-
- // The expression to use to obtain the input to the accumulator.
- boost::intrusive_ptr<Expression> initializer;
-
- // An expression evaluated once per input document, and passed to AccumulatorState::process.
- boost::intrusive_ptr<Expression> argument;
-
- // Constructs an AccumulatorState to do actual accumulation.
- boost::intrusive_ptr<AccumulatorState> makeAccumulator() const;
-
- // A no argument function object that can be called to create an AccumulatorState.
- const AccumulatorState::Factory factory;
-};
-
-/**
- * A default parser for any accumulator that only takes a single expression as an argument. Returns
- * the expression to be evaluated by the accumulator and an AccumulatorState::Factory.
- */
-template <class AccName>
-AccumulationExpression genericParseSingleExpressionAccumulator(
- boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps) {
- auto initializer = ExpressionConstant::create(expCtx, Value(BSONNULL));
- auto argument = Expression::parseOperand(expCtx, elem, vps);
- return {initializer, argument, [expCtx]() { return AccName::create(expCtx); }};
-}
-
-/**
* A class representing a user-specified accumulation, including the field name to put the
* accumulated result in, which accumulator to use, and the expression used to obtain the input to
- * the AccumulatorState.
+ * the Accumulator.
*/
class AccumulationStatement {
public:
- using Parser = std::function<AccumulationExpression(
+ using Parser = std::function<std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>(
boost::intrusive_ptr<ExpressionContext>, BSONElement, VariablesParseState)>;
-
- AccumulationStatement(std::string fieldName, AccumulationExpression expr)
- : fieldName(std::move(fieldName)), expr(std::move(expr)) {}
+ AccumulationStatement(std::string fieldName,
+ boost::intrusive_ptr<Expression> expression,
+ Accumulator::Factory factory)
+ : fieldName(std::move(fieldName)),
+ expression(std::move(expression)),
+ _factory(std::move(factory)) {}
/**
* Parses a BSONElement that is an accumulated field, and returns an AccumulationStatement for
* that accumulated field.
*
- * Throws an AssertionException if parsing fails.
+ * Throws a AssertionException if parsing fails.
*/
static AccumulationStatement parseAccumulationStatement(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -155,9 +79,9 @@ public:
const VariablesParseState& vps);
/**
- * Registers an AccumulatorState with a parsing function, so that when an accumulator with the
- * given name is encountered during parsing, we will know to call 'factory' to construct that
- * AccumulatorState.
+ * Registers an Accumulator with a parsing function, so that when an accumulator with the given
+ * name is encountered during parsing, we will know to call 'factory' to construct that
+ * Accumulator.
*
* DO NOT call this method directly. Instead, use the REGISTER_ACCUMULATOR macro defined in this
* file.
@@ -166,17 +90,22 @@ public:
/**
* Retrieves the Parser for the accumulator specified by the given name, and raises an error if
- * there is no such AccumulatorState registered.
+ * there is no such Accumulator registered.
*/
static Parser& getParser(StringData name);
// The field name is used to store the results of the accumulation in a result document.
std::string fieldName;
- AccumulationExpression expr;
+ // The expression to use to obtain the input to the accumulator.
+ boost::intrusive_ptr<Expression> expression;
+
+ // Constructs an Accumulator to do actual accumulation.
+ boost::intrusive_ptr<Accumulator> makeAccumulator() const;
- // Constructs an AccumulatorState to do actual accumulation.
- boost::intrusive_ptr<AccumulatorState> makeAccumulator() const;
+private:
+ // A no argument function object that can be called to create an Accumulator.
+ const Accumulator::Factory _factory;
};
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index e59ccb543da..cb58bc7a3b1 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -51,28 +51,23 @@ namespace mongo {
* This enum indicates which documents an accumulator needs to see in order to compute its output.
*/
enum class AccumulatorDocumentsNeeded {
- // AccumulatorState needs to see all documents in a group.
+ // Accumulator needs to see all documents in a group.
kAllDocuments,
- // AccumulatorState only needs to see one document in a group, and when there is a sort order,
- // that document must be the first document.
+ // Accumulator only needs to see one document in a group, and when there is a sort order, that
+ // document must be the first document.
kFirstDocument,
- // AccumulatorState only needs to see one document in a group, and when there is a sort order,
- // that document must be the last document.
+ // Accumulator only needs to see one document in a group, and when there is a sort order, that
+ // document must be the last document.
kLastDocument,
};
-class AccumulatorState : public RefCountable {
+class Accumulator : public RefCountable {
public:
- using Factory = std::function<boost::intrusive_ptr<AccumulatorState>()>;
+ using Factory = std::function<boost::intrusive_ptr<Accumulator>()>;
- AccumulatorState(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {}
-
- /** Marks the beginning of a new group. The input is the result of evaluating
- * AccumulatorExpression::initializer, which can read from the group key.
- */
- virtual void startNewGroup(const Value& input) {}
+ Accumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {}
/** Process input and update internal state.
* merging should be true when processing outputs from getValue(true).
@@ -94,7 +89,7 @@ public:
return _memUsageBytes;
}
- /// Reset this accumulator to a fresh state, ready for a new call to startNewGroup.
+ /// Reset this accumulator to a fresh state ready to receive input.
virtual void reset() = 0;
virtual bool isAssociative() const {
@@ -114,19 +109,9 @@ public:
*
* When executing on a sharded cluster, the result of this function will be sent to each
* individual shard.
- *
- * This implementation assumes the accumulator has the simple syntax { <name>: <argument> },
- * such as { $sum: <argument> }. This syntax has no room for an initializer. Subclasses with a
- * more elaborate syntax such should override this method.
*/
- virtual Document serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
- bool explain) const {
- ExpressionConstant const* ec = dynamic_cast<ExpressionConstant const*>(initializer.get());
- invariant(ec);
- invariant(ec->getValue().nullish());
-
- return DOC(getOpName() << argument->serialize(explain));
+ virtual Document serialize(boost::intrusive_ptr<Expression> expression, bool explain) const {
+ return DOC(getOpName() << expression->serialize(explain));
}
virtual AccumulatorDocumentsNeeded documentsNeeded() const {
@@ -148,7 +133,20 @@ private:
boost::intrusive_ptr<ExpressionContext> _expCtx;
};
-class AccumulatorAddToSet final : public AccumulatorState {
+/**
+ * A default parser for any accumulator that only takes a single expression as an argument. Returns
+ * the expression to be evaluated by the accumulator and an Accumulator::Factory.
+ */
+template <class AccName>
+std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>
+genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> expCtx,
+ BSONElement elem,
+ VariablesParseState vps) {
+ auto exprValue = Expression::parseOperand(expCtx, elem, vps);
+ return {exprValue, [expCtx]() { return AccName::create(expCtx); }};
+}
+
+class AccumulatorAddToSet final : public Accumulator {
public:
/**
* Creates a new $addToSet accumulator. If no memory limit is given, defaults to the value of
@@ -162,7 +160,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
bool isAssociative() const final {
@@ -178,7 +176,7 @@ private:
int _maxMemUsageBytes;
};
-class AccumulatorFirst final : public AccumulatorState {
+class AccumulatorFirst final : public Accumulator {
public:
explicit AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -187,7 +185,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
AccumulatorDocumentsNeeded documentsNeeded() const final {
@@ -199,7 +197,7 @@ private:
Value _first;
};
-class AccumulatorLast final : public AccumulatorState {
+class AccumulatorLast final : public Accumulator {
public:
explicit AccumulatorLast(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -208,7 +206,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
AccumulatorDocumentsNeeded documentsNeeded() const final {
@@ -219,7 +217,7 @@ private:
Value _last;
};
-class AccumulatorSum final : public AccumulatorState {
+class AccumulatorSum final : public Accumulator {
public:
explicit AccumulatorSum(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -228,7 +226,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
bool isAssociative() const final {
@@ -245,7 +243,7 @@ private:
Decimal128 decimalTotal;
};
-class AccumulatorMinMax : public AccumulatorState {
+class AccumulatorMinMax : public Accumulator {
public:
enum Sense : int {
MIN = 1,
@@ -276,7 +274,7 @@ class AccumulatorMax final : public AccumulatorMinMax {
public:
explicit AccumulatorMax(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorMinMax(expCtx, MAX) {}
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
@@ -284,11 +282,11 @@ class AccumulatorMin final : public AccumulatorMinMax {
public:
explicit AccumulatorMin(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorMinMax(expCtx, MIN) {}
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
-class AccumulatorPush final : public AccumulatorState {
+class AccumulatorPush final : public Accumulator {
public:
/**
* Creates a new $push accumulator. If no memory limit is given, defaults to the value of the
@@ -302,7 +300,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
@@ -310,7 +308,7 @@ private:
int _maxMemUsageBytes;
};
-class AccumulatorAvg final : public AccumulatorState {
+class AccumulatorAvg final : public Accumulator {
public:
explicit AccumulatorAvg(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -319,7 +317,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
@@ -335,7 +333,7 @@ private:
long long _count;
};
-class AccumulatorStdDev : public AccumulatorState {
+class AccumulatorStdDev : public Accumulator {
public:
AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp);
@@ -355,7 +353,7 @@ class AccumulatorStdDevPop final : public AccumulatorStdDev {
public:
explicit AccumulatorStdDevPop(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorStdDev(expCtx, false) {}
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
@@ -363,11 +361,11 @@ class AccumulatorStdDevSamp final : public AccumulatorStdDev {
public:
explicit AccumulatorStdDevSamp(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorStdDev(expCtx, true) {}
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
-class AccumulatorMergeObjects : public AccumulatorState {
+class AccumulatorMergeObjects : public Accumulator {
public:
AccumulatorMergeObjects(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -376,7 +374,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
index 7af05eda870..3a6a7b22944 100644
--- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp
+++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
@@ -81,7 +81,7 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) {
AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<int> maxMemoryUsageBytes)
- : AccumulatorState(expCtx),
+ : Accumulator(expCtx),
_set(expCtx->getValueComparator().makeUnorderedValueSet()),
_maxMemUsageBytes(maxMemoryUsageBytes.value_or(internalQueryMaxAddToSetBytes.load())) {
_memUsageBytes = sizeof(*this);
@@ -92,7 +92,7 @@ void AccumulatorAddToSet::reset() {
_memUsageBytes = sizeof(*this);
}
-intrusive_ptr<AccumulatorState> AccumulatorAddToSet::create(
+intrusive_ptr<Accumulator> AccumulatorAddToSet::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorAddToSet(expCtx, boost::none);
}
diff --git a/src/mongo/db/pipeline/accumulator_avg.cpp b/src/mongo/db/pipeline/accumulator_avg.cpp
index 1e7f55f5ab3..2efc2cee191 100644
--- a/src/mongo/db/pipeline/accumulator_avg.cpp
+++ b/src/mongo/db/pipeline/accumulator_avg.cpp
@@ -93,7 +93,7 @@ void AccumulatorAvg::processInternal(const Value& input, bool merging) {
_count++;
}
-intrusive_ptr<AccumulatorState> AccumulatorAvg::create(
+intrusive_ptr<Accumulator> AccumulatorAvg::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorAvg(expCtx);
}
@@ -123,8 +123,8 @@ Value AccumulatorAvg::getValue(bool toBeMerged) {
}
AccumulatorAvg::AccumulatorAvg(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : AccumulatorState(expCtx), _isDecimal(false), _count(0) {
- // This is a fixed size AccumulatorState so we never need to update this
+ : Accumulator(expCtx), _isDecimal(false), _count(0) {
+ // This is a fixed size Accumulator so we never need to update this
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_first.cpp b/src/mongo/db/pipeline/accumulator_first.cpp
index b48b9e2330e..aed285cf5db 100644
--- a/src/mongo/db/pipeline/accumulator_first.cpp
+++ b/src/mongo/db/pipeline/accumulator_first.cpp
@@ -59,7 +59,7 @@ Value AccumulatorFirst::getValue(bool toBeMerged) {
}
AccumulatorFirst::AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : AccumulatorState(expCtx), _haveFirst(false) {
+ : Accumulator(expCtx), _haveFirst(false) {
_memUsageBytes = sizeof(*this);
}
@@ -70,7 +70,7 @@ void AccumulatorFirst::reset() {
}
-intrusive_ptr<AccumulatorState> AccumulatorFirst::create(
+intrusive_ptr<Accumulator> AccumulatorFirst::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorFirst(expCtx);
}
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
index 41183c7ac40..877ac1ca41e 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
@@ -37,8 +37,10 @@ namespace mongo {
REGISTER_ACCUMULATOR(_internalJsReduce, AccumulatorInternalJsReduce::parseInternalJsReduce);
-AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce(
- boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps) {
+std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>
+AccumulatorInternalJsReduce::parseInternalJsReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
+ BSONElement elem,
+ VariablesParseState vps) {
uassert(31326,
str::stream() << kAccumulatorName << " requires a document argument, but found "
<< elem.type(),
@@ -46,13 +48,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce(
BSONObj obj = elem.embeddedObject();
std::string funcSource;
- boost::intrusive_ptr<Expression> argument;
+ boost::intrusive_ptr<Expression> dataExpr;
for (auto&& element : obj) {
if (element.fieldNameStringData() == "eval") {
funcSource = parseReduceFunction(element);
} else if (element.fieldNameStringData() == "data") {
- argument = Expression::parseOperand(expCtx, element, vps);
+ dataExpr = Expression::parseOperand(expCtx, element, vps);
} else {
uasserted(31243,
str::stream() << "Invalid argument specified to " << kAccumulatorName << ": "
@@ -66,14 +68,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce(
uassert(31349,
str::stream() << kAccumulatorName
<< " requires 'data' argument, recieved input: " << obj.toString(false),
- argument);
+ dataExpr);
auto factory = [expCtx, funcSource = funcSource]() {
return AccumulatorInternalJsReduce::create(expCtx, funcSource);
};
- auto initializer = ExpressionConstant::create(expCtx, Value(BSONNULL));
- return {std::move(initializer), std::move(argument), std::move(factory)};
+ return {std::move(dataExpr), std::move(factory)};
}
std::string AccumulatorInternalJsReduce::parseReduceFunction(BSONElement func) {
@@ -167,7 +168,7 @@ Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) {
}
}
-boost::intrusive_ptr<AccumulatorState> AccumulatorInternalJsReduce::create(
+boost::intrusive_ptr<Accumulator> AccumulatorInternalJsReduce::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource) {
return make_intrusive<AccumulatorInternalJsReduce>(expCtx, funcSource);
@@ -180,233 +181,9 @@ void AccumulatorInternalJsReduce::reset() {
}
// Returns this accumulator serialized as a Value along with the reduce function.
-Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
+Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> expression,
bool explain) const {
- return DOC(getOpName() << DOC("data" << argument->serialize(explain) << "eval" << _funcSource));
+ return DOC(
+ getOpName() << DOC("data" << expression->serialize(explain) << "eval" << _funcSource));
}
-
-REGISTER_ACCUMULATOR(accumulator, AccumulatorJs::parse);
-
-boost::intrusive_ptr<AccumulatorState> AccumulatorJs::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::string init,
- std::string accumulate,
- std::string merge,
- std::string finalize) {
- return new AccumulatorJs(
- expCtx, std::move(init), std::move(accumulate), std::move(merge), std::move(finalize));
-}
-
-namespace {
-// Parses a constant expression of type String or Code.
-std::string parseFunction(StringData fieldName,
- boost::intrusive_ptr<ExpressionContext> expCtx,
- BSONElement elem,
- VariablesParseState vps) {
- boost::intrusive_ptr<Expression> expr = Expression::parseOperand(expCtx, elem, vps);
- expr = expr->optimize();
- ExpressionConstant* ec = dynamic_cast<ExpressionConstant*>(expr.get());
- uassert(4544701,
- str::stream() << "$accumulator '" << fieldName << "' must be a constant expression",
- ec);
- Value v = ec->getValue();
- uassert(4544702,
- str::stream() << "$accumulator '" << fieldName << "' must be a String or Code",
- v.getType() == BSONType::String || v.getType() == BSONType::Code);
- return v.coerceToString();
-}
-} // namespace
-
-
-Document AccumulatorJs::serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
- bool explain) const {
- MutableDocument args;
- args.addField("init", Value(_init));
- args.addField("initArgs", Value(initializer->serialize(explain)));
- args.addField("accumulate", Value(_accumulate));
- args.addField("accumulateArgs", Value(argument->serialize(explain)));
- args.addField("merge", Value(_merge));
- args.addField("finalize", Value(_finalize));
- args.addField("lang", Value("js"_sd));
- return DOC(getOpName() << args.freeze());
-}
-
-AccumulationExpression AccumulatorJs::parse(boost::intrusive_ptr<ExpressionContext> expCtx,
- BSONElement elem,
- VariablesParseState vps) {
- /*
- * {$accumulator: {
- * init: <code>,
- * accumulate: <code>,
- * merge: <code>,
- * finalize: <code>,
- *
- * accumulateArgs: <expr>, // evaluated once per document
- *
- * initArgs: <expr>, // evaluated once per group
- *
- * lang: 'js',
- * }}
- */
- uassert(4544703,
- str::stream() << "$accumulator expects an object as an argument; found: "
- << typeName(elem.type()),
- elem.type() == BSONType::Object);
- BSONObj obj = elem.embeddedObject();
-
- std::string init, accumulate, merge, finalize;
- boost::intrusive_ptr<Expression> initArgs, accumulateArgs;
-
- for (auto&& element : obj) {
- auto name = element.fieldNameStringData();
- if (name == "init") {
- init = parseFunction("init", expCtx, element, vps);
- } else if (name == "accumulate") {
- accumulate = parseFunction("accumulate", expCtx, element, vps);
- } else if (name == "merge") {
- merge = parseFunction("merge", expCtx, element, vps);
- } else if (name == "finalize") {
- finalize = parseFunction("finalize", expCtx, element, vps);
- } else if (name == "initArgs") {
- initArgs = Expression::parseOperand(expCtx, element, vps);
- } else if (name == "accumulateArgs") {
- accumulateArgs = Expression::parseOperand(expCtx, element, vps);
- } else if (name == "lang") {
- uassert(4544704,
- str::stream() << "$accumulator lang must be a string; found: "
- << element.type(),
- element.type() == BSONType::String);
- uassert(4544705,
- "$accumulator only supports lang: 'js'",
- element.valueStringData() == "js");
- } else {
- // unexpected field
- uassert(
- 4544706, str::stream() << "$accumulator got an unexpected field: " << name, false);
- }
- }
- uassert(4544707, "$accumulator missing required argument 'init'", !init.empty());
- uassert(4544708, "$accumulator missing required argument 'accumulate'", !accumulate.empty());
- uassert(4544709, "$accumulator missing required argument 'merge'", !merge.empty());
- if (finalize.empty()) {
- // finalize is optional because many custom accumulators will return the final state
- // unchanged.
- finalize = "function(state) { return state; }";
- }
- if (!initArgs) {
- // initArgs is optional because most custom accumulators don't need the state to depend on
- // the group key.
- initArgs = ExpressionConstant::create(expCtx, Value(BSONArray()));
- }
- // accumulateArgs is required because it's the only way to communicate a value from the input
- // stream into the accumulator state.
- uassert(4544710, "$accumulator missing required argument 'accumulateArgs'", accumulateArgs);
-
- auto factory = [expCtx = expCtx,
- init = std::move(init),
- accumulate = std::move(accumulate),
- merge = std::move(merge),
- finalize = std::move(finalize)]() {
- return new AccumulatorJs(expCtx, init, accumulate, merge, finalize);
- };
- return {std::move(initArgs), std::move(accumulateArgs), std::move(factory)};
-}
-
-Value AccumulatorJs::getValue(bool toBeMerged) {
- // _state is initialized when we encounter the first document in each group. We never create
- // empty groups: even in a {$group: {_id: 1, ...}}, we will return zero groups rather than one
- // empty group.
- invariant(_state);
-
- // If toBeMerged then we return the current state, to be fed back in to accumulate / merge /
- // finalize later. If not toBeMerged then we return the final value, by calling finalize.
- if (toBeMerged) {
- return *_state;
- }
-
- // Get the final value given the current accumulator state.
-
- auto& expCtx = getExpressionContext();
- auto jsExec = expCtx->getJsExecWithScope();
- auto func = makeJsFunc(expCtx, _finalize);
-
- return jsExec->callFunction(func, BSON_ARRAY(*_state), {});
-}
-
-void AccumulatorJs::startNewGroup(Value const& input) {
- // Between groups the _state should be empty: we initialize it to be empty it in the
- // constructor, and we clear it at the end of each group (in .reset()).
- invariant(!_state);
-
- auto& expCtx = getExpressionContext();
- auto jsExec = expCtx->getJsExecWithScope();
- auto func = makeJsFunc(expCtx, _init);
-
- // input is a value produced by our AccumulationExpression::initializer.
- uassert(4544711,
- str::stream() << "$accumulator initArgs must evaluate to an array: "
- << input.toString(),
- input.getType() == BSONType::Array);
-
- size_t index = 0;
- BSONArrayBuilder bob;
- for (auto&& arg : input.getArray()) {
- arg.addToBsonArray(&bob, index++);
- }
-
- _state = jsExec->callFunction(func, bob.arr(), {});
-
- recomputeMemUsageBytes();
-}
-
-void AccumulatorJs::reset() {
- _state = std::nullopt;
- recomputeMemUsageBytes();
-}
-
-void AccumulatorJs::processInternal(const Value& input, bool merging) {
- // _state should be nonempty because we populate it in startNewGroup.
- invariant(_state);
-
- auto& expCtx = getExpressionContext();
- auto jsExec = expCtx->getJsExecWithScope();
-
- if (merging) {
- // input is an intermediate state from another instance of this kind of accumulator. Call
- // the user's merge function.
- auto func = makeJsFunc(expCtx, _merge);
- _state = jsExec->callFunction(func, BSON_ARRAY(*_state << input), {});
- recomputeMemUsageBytes();
- } else {
- // input is a value produced by our AccumulationExpression::argument. Call the user's
- // accumulate function.
- auto func = makeJsFunc(expCtx, _accumulate);
- uassert(4544712,
- str::stream() << "$accumulator accumulateArgs must evaluate to an array: "
- << input.toString(),
- input.getType() == BSONType::Array);
-
- size_t index = 0;
- BSONArrayBuilder bob;
- _state->addToBsonArray(&bob, index++);
- for (auto&& arg : input.getArray()) {
- arg.addToBsonArray(&bob, index++);
- }
-
- _state = jsExec->callFunction(func, bob.done(), {});
- recomputeMemUsageBytes();
- }
-}
-
-void AccumulatorJs::recomputeMemUsageBytes() {
- auto stateSize = _state.value_or(Value{}).getApproximateSize();
- uassert(4544713,
- str::stream() << "$accumulator state exceeded max BSON size: " << stateSize,
- stateSize <= BSONObjMaxUserSize);
- _memUsageBytes = sizeof(*this) + stateSize + _init.capacity() + _accumulate.capacity() +
- _merge.capacity() + _finalize.capacity();
-}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.h b/src/mongo/db/pipeline/accumulator_js_reduce.h
index 1bb15f948e8..bc2725a5567 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.h
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.h
@@ -38,19 +38,19 @@
namespace mongo {
-class AccumulatorInternalJsReduce final : public AccumulatorState {
+class AccumulatorInternalJsReduce final : public Accumulator {
public:
static constexpr auto kAccumulatorName = "$_internalJsReduce"_sd;
- static boost::intrusive_ptr<AccumulatorState> create(
+ static boost::intrusive_ptr<Accumulator> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource);
- static AccumulationExpression parseInternalJsReduce(
+ static std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> parseInternalJsReduce(
boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps);
AccumulatorInternalJsReduce(const boost::intrusive_ptr<ExpressionContext>& expCtx,
StringData funcSource)
- : AccumulatorState(expCtx), _funcSource(funcSource) {
+ : Accumulator(expCtx), _funcSource(funcSource) {
_memUsageBytes = sizeof(*this);
}
@@ -64,8 +64,7 @@ public:
void reset() final;
- virtual Document serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
+ virtual Document serialize(boost::intrusive_ptr<Expression> expression,
bool explain) const override;
private:
@@ -76,59 +75,4 @@ private:
Value _key;
};
-class AccumulatorJs final : public AccumulatorState {
-public:
- static constexpr auto kAccumulatorName = "$accumulator"_sd;
- const char* getOpName() const final {
- return kAccumulatorName.rawData();
- }
-
- // An AccumulatorState instance only owns its "static" arguments: those that don't need to be
- // evaluated per input document.
- static boost::intrusive_ptr<AccumulatorState> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::string init,
- std::string accumulate,
- std::string merge,
- std::string finalize);
-
- static AccumulationExpression parse(boost::intrusive_ptr<ExpressionContext> expCtx,
- BSONElement elem,
- VariablesParseState vps);
-
- Value getValue(bool toBeMerged) final;
- void reset() final;
- void processInternal(const Value& input, bool merging) final;
-
- Document serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
- bool explain) const final;
- void startNewGroup(Value const& input) final;
-
-private:
- AccumulatorJs(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::string init,
- std::string accumulate,
- std::string merge,
- std::string finalize)
- : AccumulatorState(expCtx),
- _init(init),
- _accumulate(accumulate),
- _merge(merge),
- _finalize(finalize) {
- recomputeMemUsageBytes();
- }
- void recomputeMemUsageBytes();
-
- // static arguments
- std::string _init, _accumulate, _merge, _finalize;
-
- // accumulator state during execution
- // - When the accumulator is first created, _state is empty.
- // - When the accumulator is fed its first input Value, it runs the user init and accumulate
- // functions, and _state gets a Value.
- // - When the accumulator is reset, _state becomes empty again.
- std::optional<Value> _state;
-};
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_last.cpp b/src/mongo/db/pipeline/accumulator_last.cpp
index 14362e42cab..150360d4fdd 100644
--- a/src/mongo/db/pipeline/accumulator_last.cpp
+++ b/src/mongo/db/pipeline/accumulator_last.cpp
@@ -55,7 +55,7 @@ Value AccumulatorLast::getValue(bool toBeMerged) {
}
AccumulatorLast::AccumulatorLast(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : AccumulatorState(expCtx) {
+ : Accumulator(expCtx) {
_memUsageBytes = sizeof(*this);
}
@@ -64,7 +64,7 @@ void AccumulatorLast::reset() {
_last = Value();
}
-intrusive_ptr<AccumulatorState> AccumulatorLast::create(
+intrusive_ptr<Accumulator> AccumulatorLast::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorLast(expCtx);
}
diff --git a/src/mongo/db/pipeline/accumulator_merge_objects.cpp b/src/mongo/db/pipeline/accumulator_merge_objects.cpp
index 6b23ae528a1..d1e6310ea23 100644
--- a/src/mongo/db/pipeline/accumulator_merge_objects.cpp
+++ b/src/mongo/db/pipeline/accumulator_merge_objects.cpp
@@ -49,14 +49,14 @@ const char* AccumulatorMergeObjects::getOpName() const {
return "$mergeObjects";
}
-intrusive_ptr<AccumulatorState> AccumulatorMergeObjects::create(
+intrusive_ptr<Accumulator> AccumulatorMergeObjects::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorMergeObjects(expCtx);
}
AccumulatorMergeObjects::AccumulatorMergeObjects(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : AccumulatorState(expCtx) {
+ : Accumulator(expCtx) {
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_min_max.cpp b/src/mongo/db/pipeline/accumulator_min_max.cpp
index 265a84b7075..c1af0ff8226 100644
--- a/src/mongo/db/pipeline/accumulator_min_max.cpp
+++ b/src/mongo/db/pipeline/accumulator_min_max.cpp
@@ -71,7 +71,7 @@ Value AccumulatorMinMax::getValue(bool toBeMerged) {
AccumulatorMinMax::AccumulatorMinMax(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Sense sense)
- : AccumulatorState(expCtx), _sense(sense) {
+ : Accumulator(expCtx), _sense(sense) {
_memUsageBytes = sizeof(*this);
}
@@ -80,12 +80,12 @@ void AccumulatorMinMax::reset() {
_memUsageBytes = sizeof(*this);
}
-intrusive_ptr<AccumulatorState> AccumulatorMin::create(
+intrusive_ptr<Accumulator> AccumulatorMin::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorMin(expCtx);
}
-intrusive_ptr<AccumulatorState> AccumulatorMax::create(
+intrusive_ptr<Accumulator> AccumulatorMax::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorMax(expCtx);
}
diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp
index 3a004dd9731..d13a942ef62 100644
--- a/src/mongo/db/pipeline/accumulator_push.cpp
+++ b/src/mongo/db/pipeline/accumulator_push.cpp
@@ -83,7 +83,7 @@ Value AccumulatorPush::getValue(bool toBeMerged) {
AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<int> maxMemoryUsageBytes)
- : AccumulatorState(expCtx),
+ : Accumulator(expCtx),
_maxMemUsageBytes(maxMemoryUsageBytes.value_or(internalQueryMaxPushBytes.load())) {
_memUsageBytes = sizeof(*this);
}
@@ -93,7 +93,7 @@ void AccumulatorPush::reset() {
_memUsageBytes = sizeof(*this);
}
-intrusive_ptr<AccumulatorState> AccumulatorPush::create(
+intrusive_ptr<Accumulator> AccumulatorPush::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorPush(expCtx, boost::none);
}
diff --git a/src/mongo/db/pipeline/accumulator_std_dev.cpp b/src/mongo/db/pipeline/accumulator_std_dev.cpp
index cdd31b2c897..55367d766be 100644
--- a/src/mongo/db/pipeline/accumulator_std_dev.cpp
+++ b/src/mongo/db/pipeline/accumulator_std_dev.cpp
@@ -96,20 +96,20 @@ Value AccumulatorStdDev::getValue(bool toBeMerged) {
}
}
-intrusive_ptr<AccumulatorState> AccumulatorStdDevSamp::create(
+intrusive_ptr<Accumulator> AccumulatorStdDevSamp::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorStdDevSamp(expCtx);
}
-intrusive_ptr<AccumulatorState> AccumulatorStdDevPop::create(
+intrusive_ptr<Accumulator> AccumulatorStdDevPop::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorStdDevPop(expCtx);
}
AccumulatorStdDev::AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx,
bool isSamp)
- : AccumulatorState(expCtx), _isSamp(isSamp), _count(0), _mean(0), _m2(0) {
- // This is a fixed size AccumulatorState so we never need to update this
+ : Accumulator(expCtx), _isSamp(isSamp), _count(0), _mean(0), _m2(0) {
+ // This is a fixed size Accumulator so we never need to update this
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_sum.cpp b/src/mongo/db/pipeline/accumulator_sum.cpp
index 182f592ce3f..6cd34d8c76f 100644
--- a/src/mongo/db/pipeline/accumulator_sum.cpp
+++ b/src/mongo/db/pipeline/accumulator_sum.cpp
@@ -85,7 +85,7 @@ void AccumulatorSum::processInternal(const Value& input, bool merging) {
}
}
-intrusive_ptr<AccumulatorState> AccumulatorSum::create(
+intrusive_ptr<Accumulator> AccumulatorSum::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new AccumulatorSum(expCtx);
}
@@ -128,8 +128,8 @@ Value AccumulatorSum::getValue(bool toBeMerged) {
}
AccumulatorSum::AccumulatorSum(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : AccumulatorState(expCtx) {
- // This is a fixed size AccumulatorState so we never need to update this.
+ : Accumulator(expCtx) {
+ // This is a fixed size Accumulator so we never need to update this.
_memUsageBytes = sizeof(*this);
}
diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp
index 724e6a6838a..373a24a4155 100644
--- a/src/mongo/db/pipeline/accumulator_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_test.cpp
@@ -46,9 +46,9 @@ using std::numeric_limits;
using std::string;
/**
- * Takes the name of an AccumulatorState as its template argument and a list of pairs of arguments
- * and expected results as its second argument, and asserts that for the given AccumulatorState the
- * arguments evaluate to the expected results.
+ * Takes the name of an Accumulator as its template argument and a list of pairs of arguments and
+ * expected results as its second argument, and asserts that for the given Accumulator the arguments
+ * evaluate to the expected results.
*/
template <typename AccName>
static void assertExpectedResults(
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index c6decbca69c..eba38c5c58b 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -109,25 +109,13 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() {
return makeDocument(*(_bucketsIterator++));
}
-boost::intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::optimize() {
- _groupByExpression = _groupByExpression->optimize();
- for (auto&& accumulatedField : _accumulatedFields) {
- accumulatedField.expr.argument = accumulatedField.expr.argument->optimize();
- accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize();
- }
- return this;
-}
-
DepsTracker::State DocumentSourceBucketAuto::getDependencies(DepsTracker* deps) const {
// Add the 'groupBy' expression.
_groupByExpression->addDependencies(deps);
// Add the 'output' fields.
for (auto&& accumulatedField : _accumulatedFields) {
- // Anything the per-doc expression depends on, the whole stage depends on.
- accumulatedField.expr.argument->addDependencies(deps);
- // The initializer should be an ExpressionConstant, or something that optimizes to one.
- // ExpressionConstant doesn't have dependencies.
+ accumulatedField.expression->addDependencies(deps);
}
// We know exactly which fields will be present in the output document. Future stages cannot
@@ -201,8 +189,7 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>&
const size_t numAccumulators = _accumulatedFields.size();
for (size_t k = 0; k < numAccumulators; k++) {
bucket._accums[k]->process(
- _accumulatedFields[k].expr.argument->evaluate(entry.second, &pExpCtx->variables),
- false);
+ _accumulatedFields[k].expression->evaluate(entry.second, &pExpCtx->variables), false);
}
}
@@ -247,16 +234,6 @@ void DocumentSourceBucketAuto::populateBuckets() {
// Initialize the current bucket.
Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields);
- // Evaluate each initializer against an empty document. Normally the
- // initializer can refer to the group key, but in $bucketAuto there is no single
- // group key per bucket.
- Document emptyDoc;
- for (size_t k = 0; k < _accumulatedFields.size(); ++k) {
- Value initializerValue =
- _accumulatedFields[k].expr.initializer->evaluate(emptyDoc, &pExpCtx->variables);
- currentBucket._accums[k]->startNewGroup(initializerValue);
- }
-
// Add the first value into the current bucket.
addDocumentToBucket(currentValue, currentBucket);
@@ -405,11 +382,10 @@ Value DocumentSourceBucketAuto::serialize(
MutableDocument outputSpec(_accumulatedFields.size());
for (auto&& accumulatedField : _accumulatedFields) {
- intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator();
+ intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator();
outputSpec[accumulatedField.fieldName] =
- Value(accum->serialize(accumulatedField.expr.initializer,
- accumulatedField.expr.argument,
- static_cast<bool>(explain)));
+ Value{Document{{accum->getOpName(),
+ accumulatedField.expression->serialize(static_cast<bool>(explain))}}};
}
insides["output"] = outputSpec.freezeToValue();
@@ -429,11 +405,9 @@ intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create(
numBuckets > 0);
// If there is no output field specified, then add the default one.
if (accumulationStatements.empty()) {
- accumulationStatements.emplace_back(
- "count",
- AccumulationExpression(ExpressionConstant::create(pExpCtx, Value(BSONNULL)),
- ExpressionConstant::create(pExpCtx, Value(1)),
- [pExpCtx] { return AccumulatorSum::create(pExpCtx); }));
+ accumulationStatements.emplace_back("count",
+ ExpressionConstant::create(pExpCtx, Value(1)),
+ [pExpCtx] { return AccumulatorSum::create(pExpCtx); });
}
return new DocumentSourceBucketAuto(pExpCtx,
groupByExpression,
@@ -512,13 +486,8 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson(
argument.type() == BSONType::Object);
for (auto&& outputField : argument.embeddedObject()) {
- auto stmt =
- AccumulationStatement::parseAccumulationStatement(pExpCtx, outputField, vps);
- stmt.expr.initializer = stmt.expr.initializer->optimize();
- uassert(4544714,
- "Can't refer to the group key in $bucketAuto",
- ExpressionConstant::isNullOrConstant(stmt.expr.initializer));
- accumulationStatements.push_back(std::move(stmt));
+ accumulationStatements.push_back(
+ AccumulationStatement::parseAccumulationStatement(pExpCtx, outputField, vps));
}
} else if ("granularity" == argName) {
uassert(40261,
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index fcc35a130d3..8804b0df6c1 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -49,7 +49,6 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
const char* getSourceName() const final;
- boost::intrusive_ptr<DocumentSource> optimize() final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
return {StreamType::kBlocking,
@@ -115,7 +114,7 @@ private:
const std::vector<AccumulationStatement>& accumulationStatements);
Value _min;
Value _max;
- std::vector<boost::intrusive_ptr<AccumulatorState>> _accums;
+ std::vector<boost::intrusive_ptr<Accumulator>> _accums;
};
/**
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 8268f0a1dd1..0f961f8c363 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -158,17 +158,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() {
_currentId = _firstPartOfNextGroup.first;
const size_t numAccumulators = _accumulatedFields.size();
-
- // Call startNewGroup on every accumulator.
- Value expandedId = expandId(_currentId);
- Document idDoc =
- expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
- for (size_t i = 0; i < numAccumulators; ++i) {
- Value initializerValue =
- _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables);
- _currentAccumulators[i]->startNewGroup(initializerValue);
- }
-
while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) {
// Inside of this loop, _firstPartOfNextGroup is the current data being processed.
// At loop exit, it is the first value to be processed in the next group.
@@ -227,8 +216,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() {
}
for (auto&& accumulatedField : _accumulatedFields) {
- accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize();
- accumulatedField.expr.argument = accumulatedField.expr.argument->optimize();
+ accumulatedField.expression = accumulatedField.expression->optimize();
}
return this;
@@ -253,11 +241,9 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity>
// Add the remaining fields.
for (auto&& accumulatedField : _accumulatedFields) {
- intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator();
+ intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator();
insides[accumulatedField.fieldName] =
- Value(accum->serialize(accumulatedField.expr.initializer,
- accumulatedField.expr.argument,
- static_cast<bool>(explain)));
+ Value(accum->serialize(accumulatedField.expression, static_cast<bool>(explain)));
}
if (_doingMerge) {
@@ -277,8 +263,7 @@ DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const
// add the rest
for (auto&& accumulatedField : _accumulatedFields) {
- accumulatedField.expr.argument->addDependencies(deps);
- // Don't add initializer, because it doesn't refer to docs from the input stream.
+ accumulatedField.expression->addDependencies(deps);
}
return DepsTracker::State::EXHAUSTIVE_ALL;
@@ -500,23 +485,16 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and
// looking it up in '_groups' multiple times.
const size_t oldSize = _groups->size();
- vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id];
+ vector<intrusive_ptr<Accumulator>>& group = (*_groups)[id];
const bool inserted = _groups->size() != oldSize;
if (inserted) {
_memoryUsageBytes += id.getApproximateSize();
- // Initialize and add the accumulators
- Value expandedId = expandId(id);
- Document idDoc =
- expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
+ // Add the accumulators
group.reserve(numAccumulators);
for (auto&& accumulatedField : _accumulatedFields) {
- auto accum = accumulatedField.makeAccumulator();
- Value initializerValue =
- accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables);
- accum->startNewGroup(initializerValue);
- group.push_back(accum);
+ group.push_back(accumulatedField.makeAccumulator());
}
} else {
for (auto&& groupObj : group) {
@@ -530,7 +508,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
for (size_t i = 0; i < numAccumulators; i++) {
group[i]->process(
- _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables),
+ _accumulatedFields[i].expression->evaluate(rootDocument, &pExpCtx->variables),
_doingMerge);
_memoryUsageBytes += group[i]->memUsageForSorter();
@@ -715,7 +693,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distr
// original accumulator may be collecting an expression based on a field expression or
// constant. Here, we accumulate the output of the same name from the prior group.
auto copiedAccumulatedField = accumulatedField;
- copiedAccumulatedField.expr.argument =
+ copiedAccumulatedField.expression =
ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + copiedAccumulatedField.fieldName, vps);
mergingGroup->addAccumulator(copiedAccumulatedField);
}
@@ -797,10 +775,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
fields.push_back(std::make_pair("_id", ExpressionFieldPath::create(pExpCtx, groupId)));
for (auto&& accumulator : _accumulatedFields) {
- fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument));
-
- // Since we don't attempt this transformation for non-$first accumulators,
- // the initializer should always be trivial.
+ fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expression));
}
return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields));
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 5d885e84a1f..88b0a669b6e 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -88,7 +88,7 @@ private:
class DocumentSourceGroup final : public DocumentSource {
public:
- using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>;
+ using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>;
using GroupsMap = ValueUnorderedMap<Accumulators>;
static constexpr StringData kStageName = "$group"_sd;
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index c3c764c4f6a..a2ed4358248 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -75,8 +75,9 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) {
// This is the only way to do this in a debug build.
auto&& parser = AccumulationStatement::getParser("$sum");
auto accumulatorArg = BSON("" << 1);
- auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
- AccumulationStatement countStatement{"count", accExpr};
+ auto [expression, factory] =
+ parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
+ AccumulationStatement countStatement{"count", expression, factory};
auto group = DocumentSourceGroup::create(
expCtx, ExpressionConstant::create(expCtx, Value(BSONNULL)), {countStatement});
auto mock =
@@ -112,8 +113,9 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) {
auto&& parser = AccumulationStatement::getParser("$push");
auto accumulatorArg = BSON(""
<< "$largeStr");
- auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
- AccumulationStatement pushStatement{"spaceHog", accExpr};
+ auto [expression, factory] =
+ parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
+ AccumulationStatement pushStatement{"spaceHog", expression, factory};
auto groupByExpression =
ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState);
auto group = DocumentSourceGroup::create(
@@ -154,8 +156,9 @@ TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSet
auto&& parser = AccumulationStatement::getParser("$push");
auto accumulatorArg = BSON(""
<< "$largeStr");
- auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
- AccumulationStatement pushStatement{"spaceHog", accExpr};
+ auto [expression, factory] =
+ parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
+ AccumulationStatement pushStatement{"spaceHog", expression, factory};
auto groupByExpression =
ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState);
auto group = DocumentSourceGroup::create(
@@ -178,8 +181,9 @@ TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
auto&& parser = AccumulationStatement::getParser("$push");
auto accumulatorArg = BSON(""
<< "$largeStr");
- auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
- AccumulationStatement pushStatement{"spaceHog", accExpr};
+ auto [expression, factory] =
+ parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState);
+ AccumulationStatement pushStatement{"spaceHog", expression, factory};
auto groupByExpression =
ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState);
auto group = DocumentSourceGroup::create(
diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h
index 4f772ca2363..b05b4dcd918 100644
--- a/src/mongo/db/pipeline/expression.h
+++ b/src/mongo/db/pipeline/expression.h
@@ -399,15 +399,15 @@ public:
* Used to make Accumulators available as Expressions, e.g., to make $sum available as an Expression
* use "REGISTER_EXPRESSION(sum, ExpressionAccumulator<AccumulatorSum>::parse);".
*/
-template <typename AccumulatorState>
+template <typename Accumulator>
class ExpressionFromAccumulator
- : public ExpressionVariadic<ExpressionFromAccumulator<AccumulatorState>> {
+ : public ExpressionVariadic<ExpressionFromAccumulator<Accumulator>> {
public:
explicit ExpressionFromAccumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : ExpressionVariadic<ExpressionFromAccumulator<AccumulatorState>>(expCtx) {}
+ : ExpressionVariadic<ExpressionFromAccumulator<Accumulator>>(expCtx) {}
Value evaluate(const Document& root, Variables* variables) const final {
- AccumulatorState accum(this->getExpressionContext());
+ Accumulator accum(this->getExpressionContext());
const auto n = this->_children.size();
// If a single array arg is given, loop through it passing each member to the accumulator.
// If a single, non-array arg is given, pass it directly to the accumulator.
@@ -435,15 +435,15 @@ public:
if (this->_children.size() == 1) {
return false;
}
- return AccumulatorState(this->getExpressionContext()).isAssociative();
+ return Accumulator(this->getExpressionContext()).isAssociative();
}
bool isCommutative() const final {
- return AccumulatorState(this->getExpressionContext()).isCommutative();
+ return Accumulator(this->getExpressionContext()).isCommutative();
}
const char* getOpName() const final {
- return AccumulatorState(this->getExpressionContext()).getOpName();
+ return Accumulator(this->getExpressionContext()).getOpName();
}
void acceptVisitor(ExpressionVisitor* visitor) final {
diff --git a/src/mongo/db/pipeline/expression_visitor.h b/src/mongo/db/pipeline/expression_visitor.h
index 393778e5f21..326c8e2e6ce 100644
--- a/src/mongo/db/pipeline/expression_visitor.h
+++ b/src/mongo/db/pipeline/expression_visitor.h
@@ -158,7 +158,7 @@ class AccumulatorStdDevPop;
class AccumulatorStdDevSamp;
class AccumulatorSum;
class AccumulatorMergeObjects;
-template <typename AccumulatorState>
+template <typename Accumulator>
class ExpressionFromAccumulator;
/**