diff options
27 files changed, 167 insertions, 814 deletions
diff --git a/jstests/aggregation/accumulators/accumulator_js.js b/jstests/aggregation/accumulators/accumulator_js.js deleted file mode 100644 index 2942ef641dd..00000000000 --- a/jstests/aggregation/accumulators/accumulator_js.js +++ /dev/null @@ -1,127 +0,0 @@ -// Test the behavior of user-defined (Javascript) accumulators. -(function() { -"use strict"; - -load('jstests/aggregation/extras/utils.js'); - -db.accumulator_js.drop(); - -for (const word of ["hello", "world", "world", "hello", "hi"]) { - db.accumulator_js.insert({word: word, val: 1}); -} - -const command = { - aggregate: 'accumulator_js', - cursor: {}, - pipeline: [{ - $group: { - _id: "$word", - wordCount: { - $accumulator: { - init: function() { - return 0; - }, - accumulateArgs: ["$val"], - accumulate: function(state, val) { - return state + val; - }, - merge: function(state1, state2) { - return state1 + state2; - }, - finalize: function(state) { - return state; - } - } - } - } - }], -}; - -const expectedResults = [ - {_id: "hello", wordCount: 2}, - {_id: "world", wordCount: 2}, - {_id: "hi", wordCount: 1}, -]; - -let res = assert.commandWorked(db.runCommand(command)); -assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor); - -// Test that the functions can be passed as strings. -{ - const accumulatorSpec = command.pipeline[0].$group.wordCount.$accumulator; - accumulatorSpec.init = accumulatorSpec.init.toString(); - accumulatorSpec.accumulate = accumulatorSpec.accumulate.toString(); - accumulatorSpec.merge = accumulatorSpec.merge.toString(); - accumulatorSpec.finalize = accumulatorSpec.finalize.toString(); -} -res = assert.commandWorked(db.runCommand(command)); -assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor); - -// Test that finalize is optional. -delete command.pipeline[0].$group.wordCount.$accumulator.finalize; -res = assert.commandWorked(db.runCommand(command)); -assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor); - -// Test a finalizer other than the identity function. Finalizers are useful when the intermediate -// state needs to be a different format from the final result. -res = assert.commandWorked(db.runCommand(Object.merge(command, { - pipeline: [{ - $group: { - _id: 1, - avgWordLen: { - $accumulator: { - init: function() { - return {count: 0, sum: 0}; - }, - accumulateArgs: [{$strLenCP: "$word"}], - accumulate: function({count, sum}, wordLen) { - return {count: count + 1, sum: sum + wordLen}; - }, - merge: function(s1, s2) { - return {count: s1.count + s2.count, sum: s1.sum + s2.sum}; - }, - finalize: function({count, sum}) { - return sum / count; - }, - lang: 'js', - } - }, - } - }], -}))); -assert(resultsEq(res.cursor.firstBatch, [{_id: 1, avgWordLen: 22 / 5}]), res.cursor); - -// Test that a null word is considered a valid value. -assert.commandWorked(db.accumulator_js.insert({word: null, val: 1})); -expectedResults.push({_id: null, wordCount: 1}); -res = assert.commandWorked(db.runCommand(command)); -assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor); - -// Test that missing fields become JS null. -// This is similar to how most other agg operators work. -// TODO SERVER-45450 is this a problem for mapreduce? -assert(db.accumulator_js.drop()); -assert.commandWorked(db.accumulator_js.insert({sentinel: 1})); -command.pipeline = [{ - $group: { - _id: 1, - value: { - $accumulator: { - init: function() { - return []; - }, - accumulateArgs: ["$no_such_field"], - accumulate: function(state, value) { - return state.concat([value]); - }, - merge: function(s1, s2) { - return s1.concat(s2); - }, - lang: 'js', - } - } - } -}]; -res = assert.commandWorked(db.runCommand(command)); -assert(resultsEq(res.cursor.firstBatch, [{_id: 1, value: [null]}]), res.cursor); -})(); diff --git a/jstests/aggregation/accumulators/accumulator_js_size_limits.js b/jstests/aggregation/accumulators/accumulator_js_size_limits.js deleted file mode 100644 index 80bbcc49e07..00000000000 --- a/jstests/aggregation/accumulators/accumulator_js_size_limits.js +++ /dev/null @@ -1,87 +0,0 @@ -// Test several different kinds of size limits on user-defined (Javascript) accumulators. -(function() { -"use strict"; - -const coll = db.accumulator_js_size_limits; - -function runExample(groupKey, accumulatorSpec) { - return coll.runCommand({ - aggregate: coll.getName(), - cursor: {}, - pipeline: [{ - $group: { - _id: groupKey, - accumulatedField: {$accumulator: accumulatorSpec}, - } - }] - }); -} - -// Accumulator tries to create too long a String; it can't be serialized to BSON. -coll.drop(); -assert.commandWorked(coll.insert({})); -let res = runExample(1, { - init: function() { - return "a".repeat(20 * 1024 * 1024); - }, - accumulate: function() { - throw 'accumulate should not be called'; - }, - accumulateArgs: [], - merge: function() { - throw 'merge should not be called'; - }, - finalize: function() { - throw 'finalize should not be called'; - }, - lang: 'js', -}); -assert.commandFailedWithCode(res, [10334]); - -// Accumulator tries to return BSON larger than 16MB from JS. -assert(coll.drop()); -assert.commandWorked(coll.insert({})); -res = runExample(1, { - init: function() { - const str = "a".repeat(1 * 1024 * 1024); - return Array.from({length: 20}, () => str); - }, - accumulate: function() { - throw 'accumulate should not be called'; - }, - accumulateArgs: [], - merge: function() { - throw 'merge should not be called'; - }, - finalize: function() { - throw 'finalize should not be called'; - }, - lang: 'js', -}); -assert.commandFailedWithCode(res, [17260]); - -// $group size limit exceeded, and cannot spill. -assert(coll.drop()); -assert.commandWorked(coll.insert(Array.from({length: 200}, (_, i) => ({_id: i})))); -// By grouping on _id, each group contains only 1 document. This means it creates many -// AccumulatorState instances. -res = runExample("$_id", { - init: function() { - // Each accumulator state is big enough to be expensive, but not big enough to hit the BSON - // size limit. - return "a".repeat(1 * 1024 * 1024); - }, - accumulate: function(state) { - return state; - }, - accumulateArgs: [1], - merge: function(state1, state2) { - return state1; - }, - finalize: function(state) { - return state.length; - }, - lang: 'js', -}); -assert.commandFailedWithCode(res, [16945]); -})(); diff --git a/jstests/aggregation/accumulators/internal_js_reduce.js b/jstests/aggregation/accumulators/internal_js_reduce.js index b315adacf70..ae470b0950f 100644 --- a/jstests/aggregation/accumulators/internal_js_reduce.js +++ b/jstests/aggregation/accumulators/internal_js_reduce.js @@ -43,7 +43,7 @@ const expectedResults = [ ]; let res = assert.commandWorked(db.runCommand(command)); -assert(resultsEq(res.cursor.firstBatch, expectedResults), res.cursor); +assert(resultsEq(res.cursor.firstBatch, expectedResults, res.cursor)); // // Test that the reduce function also accepts a string argument. diff --git a/jstests/sharding/javascript_heap_limit.js b/jstests/sharding/javascript_heap_limit.js index f92907fb6b3..81cf5a8d8f2 100644 --- a/jstests/sharding/javascript_heap_limit.js +++ b/jstests/sharding/javascript_heap_limit.js @@ -49,7 +49,7 @@ const aggregateWithJSFunction = { {$project: {y: {"$function": {args: [], body: allocateLargeString, lang: "js"}}}} ] }; -const aggregateWithInternalJsReduce = { +const aggregateWithJSAccumulator = { aggregate: "coll", cursor: {}, pipeline: [{ @@ -64,24 +64,6 @@ const aggregateWithInternalJsReduce = { } }] }; -const aggregateWithUserDefinedAccumulator = { - aggregate: "coll", - cursor: {}, - pipeline: [{ - $group: { - _id: "$x", - value: { - $accumulator: { - init: allocateLargeString, - accumulate: allocateLargeString, - accumulateArgs: [{k: "$x", v: "$x"}], - merge: allocateLargeString, - lang: 'js', - } - } - } - }] -}; const findWithJavaScriptFunction = { find: "coll", filter: {$expr: {"$function": {args: [], body: allocateLargeString, lang: "js"}}} @@ -100,17 +82,14 @@ function runCommonTests(db) { // All commands are expected to work with a sufficient JS heap size. setHeapSizeLimitMB({db: db, queryLimit: sufficentHeapSizeMB, globalLimit: sufficentHeapSizeMB}); assert.commandWorked(db.runCommand(aggregateWithJSFunction)); - assert.commandWorked(db.runCommand(aggregateWithInternalJsReduce)); - assert.commandWorked(db.runCommand(aggregateWithUserDefinedAccumulator)); + assert.commandWorked(db.runCommand(aggregateWithJSAccumulator)); // The aggregate command is expected to fail when the aggregation specific heap size limit is // too low. setHeapSizeLimitMB({db: db, queryLimit: tooSmallHeapSizeMB, globalLimit: sufficentHeapSizeMB}); assert.commandFailedWithCode(db.runCommand(aggregateWithJSFunction), ErrorCodes.JSInterpreterFailure); - assert.commandFailedWithCode(db.runCommand(aggregateWithInternalJsReduce), - ErrorCodes.JSInterpreterFailure); - assert.commandFailedWithCode(db.runCommand(aggregateWithUserDefinedAccumulator), + assert.commandFailedWithCode(db.runCommand(aggregateWithJSAccumulator), ErrorCodes.JSInterpreterFailure); // All commands are expected to fail when the global heap size limit is too low, regardless @@ -118,9 +97,7 @@ function runCommonTests(db) { setHeapSizeLimitMB({db: db, queryLimit: sufficentHeapSizeMB, globalLimit: tooSmallHeapSizeMB}); assert.commandFailedWithCode(db.runCommand(aggregateWithJSFunction), ErrorCodes.JSInterpreterFailure); - assert.commandFailedWithCode(db.runCommand(aggregateWithInternalJsReduce), - ErrorCodes.JSInterpreterFailure); - assert.commandFailedWithCode(db.runCommand(aggregateWithUserDefinedAccumulator), + assert.commandFailedWithCode(db.runCommand(aggregateWithJSAccumulator), ErrorCodes.JSInterpreterFailure); } diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index 52cd8bf0434..445621c1ad9 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -124,19 +124,15 @@ auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string co } auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) { - auto initializer = ExpressionArray::create(expCtx, {}); - auto argument = ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState); - auto reduceFactory = [expCtx, funcSource = std::move(code)]() { + auto accumulatorArgument = + ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState); + auto reduceFactory = [expCtx, funcSource = code]() { return AccumulatorInternalJsReduce::create(expCtx, funcSource); }; - AccumulationStatement jsReduce("value", - AccumulationExpression(std::move(initializer), - std::move(argument), - std::move(reduceFactory))); - auto groupKeyExpression = - ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState); + AccumulationStatement jsReduce("value", std::move(accumulatorArgument), reduceFactory); + auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState); return DocumentSourceGroup::create(expCtx, - std::move(groupKeyExpression), + std::move(groupExpr), make_vector<AccumulationStatement>(std::move(jsReduce)), boost::none); } diff --git a/src/mongo/db/pipeline/accumulation_statement.cpp b/src/mongo/db/pipeline/accumulation_statement.cpp index 72b2b80a6c6..18595477227 100644 --- a/src/mongo/db/pipeline/accumulation_statement.cpp +++ b/src/mongo/db/pipeline/accumulation_statement.cpp @@ -65,8 +65,8 @@ AccumulationStatement::Parser& AccumulationStatement::getParser(StringData name) return it->second; } -boost::intrusive_ptr<AccumulatorState> AccumulationStatement::makeAccumulator() const { - return expr.factory(); +boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator() const { + return _factory(); } AccumulationStatement AccumulationStatement::parseAccumulationStatement( @@ -98,10 +98,9 @@ AccumulationStatement AccumulationStatement::parseAccumulationStatement( specElem.type() != BSONType::Array); auto&& parser = AccumulationStatement::getParser(accName); - auto [initializer, argument, factory] = parser(expCtx, specElem, vps); + auto [expression, factory] = parser(expCtx, specElem, vps); - return AccumulationStatement(fieldName.toString(), - AccumulationExpression(initializer, argument, factory)); + return AccumulationStatement(fieldName.toString(), expression, factory); } } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulation_statement.h b/src/mongo/db/pipeline/accumulation_statement.h index fff666141d3..91ac3a1aff3 100644 --- a/src/mongo/db/pipeline/accumulation_statement.h +++ b/src/mongo/db/pipeline/accumulation_statement.h @@ -38,8 +38,8 @@ namespace mongo { /** - * Registers an AccumulatorState to have the name 'key'. When an accumulator with name '$key' is - * found during parsing, 'factory' will be called to construct the AccumulatorState. + * Registers an Accumulator to have the name 'key'. When an accumulator with name '$key' is found + * during parsing, 'factory' will be called to construct the Accumulator. * * As an example, if your accumulator looks like {"$foo": <args>}, with a factory method 'create', * you would add this line: @@ -52,102 +52,26 @@ namespace mongo { } /** - * AccumulatorExpression represents the right-hand side of an AccumulationStatement. Note this is - * different from Expression; they are different nonterminals in the grammar. - * - * For example, in - * {$group: { - * _id: 1, - * count: {$sum: {$size: "$tags"}} - * }} - * - * we would say: - * The AccumulationStatement is count: {$sum: {$size: "$tags"}} - * The AccumulationExpression is {$sum: {$size: "$tags"}} - * The AccumulatorState::Factory is $sum - * The argument Expression is {$size: "$tags"} - * There is no initializer Expression. - * - * "$sum" corresponds to an AccumulatorState::Factory rather than AccumulatorState because - * AccumulatorState is an execution concept, not an AST concept: each instance of AccumulatorState - * contains intermediate values being accumulated. - * - * Like most accumulators, $sum does not require or accept an initializer Expression. At time of - * writing, only user-defined accumulators accept an initializer. - * - * For example, in: - * {$group: { - * _id: {cc: "$country_code"}, - * top_stories: {$accumulator: { - * init: function(cc) { ... }, - * initArgs: ["$cc"], - * accumulate: function(state, title, upvotes) { ... }, - * accumulateArgs: ["$title", "$upvotes"], - * merge: function(state1, state2) { ... }, - * lang: "js", - * }} - * }} - * - * we would say: - * The AccumulationStatement is top_stories: {$accumulator: ... } - * The AccumulationExpression is {$accumulator: ... } - * The argument Expression is ["$cc"] - * The initializer Expression is ["$title", "$upvotes"] - * The AccumulatorState::Factory holds all the other arguments to $accumulator. - * - */ -struct AccumulationExpression { - AccumulationExpression(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, - AccumulatorState::Factory factory) - : initializer(initializer), argument(argument), factory(factory) { - invariant(this->initializer); - invariant(this->argument); - } - - // The expression to use to obtain the input to the accumulator. - boost::intrusive_ptr<Expression> initializer; - - // An expression evaluated once per input document, and passed to AccumulatorState::process. - boost::intrusive_ptr<Expression> argument; - - // Constructs an AccumulatorState to do actual accumulation. - boost::intrusive_ptr<AccumulatorState> makeAccumulator() const; - - // A no argument function object that can be called to create an AccumulatorState. - const AccumulatorState::Factory factory; -}; - -/** - * A default parser for any accumulator that only takes a single expression as an argument. Returns - * the expression to be evaluated by the accumulator and an AccumulatorState::Factory. - */ -template <class AccName> -AccumulationExpression genericParseSingleExpressionAccumulator( - boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps) { - auto initializer = ExpressionConstant::create(expCtx, Value(BSONNULL)); - auto argument = Expression::parseOperand(expCtx, elem, vps); - return {initializer, argument, [expCtx]() { return AccName::create(expCtx); }}; -} - -/** * A class representing a user-specified accumulation, including the field name to put the * accumulated result in, which accumulator to use, and the expression used to obtain the input to - * the AccumulatorState. + * the Accumulator. */ class AccumulationStatement { public: - using Parser = std::function<AccumulationExpression( + using Parser = std::function<std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>( boost::intrusive_ptr<ExpressionContext>, BSONElement, VariablesParseState)>; - - AccumulationStatement(std::string fieldName, AccumulationExpression expr) - : fieldName(std::move(fieldName)), expr(std::move(expr)) {} + AccumulationStatement(std::string fieldName, + boost::intrusive_ptr<Expression> expression, + Accumulator::Factory factory) + : fieldName(std::move(fieldName)), + expression(std::move(expression)), + _factory(std::move(factory)) {} /** * Parses a BSONElement that is an accumulated field, and returns an AccumulationStatement for * that accumulated field. * - * Throws an AssertionException if parsing fails. + * Throws a AssertionException if parsing fails. */ static AccumulationStatement parseAccumulationStatement( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -155,9 +79,9 @@ public: const VariablesParseState& vps); /** - * Registers an AccumulatorState with a parsing function, so that when an accumulator with the - * given name is encountered during parsing, we will know to call 'factory' to construct that - * AccumulatorState. + * Registers an Accumulator with a parsing function, so that when an accumulator with the given + * name is encountered during parsing, we will know to call 'factory' to construct that + * Accumulator. * * DO NOT call this method directly. Instead, use the REGISTER_ACCUMULATOR macro defined in this * file. @@ -166,17 +90,22 @@ public: /** * Retrieves the Parser for the accumulator specified by the given name, and raises an error if - * there is no such AccumulatorState registered. + * there is no such Accumulator registered. */ static Parser& getParser(StringData name); // The field name is used to store the results of the accumulation in a result document. std::string fieldName; - AccumulationExpression expr; + // The expression to use to obtain the input to the accumulator. + boost::intrusive_ptr<Expression> expression; + + // Constructs an Accumulator to do actual accumulation. + boost::intrusive_ptr<Accumulator> makeAccumulator() const; - // Constructs an AccumulatorState to do actual accumulation. - boost::intrusive_ptr<AccumulatorState> makeAccumulator() const; +private: + // A no argument function object that can be called to create an Accumulator. + const Accumulator::Factory _factory; }; diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index e59ccb543da..cb58bc7a3b1 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -51,28 +51,23 @@ namespace mongo { * This enum indicates which documents an accumulator needs to see in order to compute its output. */ enum class AccumulatorDocumentsNeeded { - // AccumulatorState needs to see all documents in a group. + // Accumulator needs to see all documents in a group. kAllDocuments, - // AccumulatorState only needs to see one document in a group, and when there is a sort order, - // that document must be the first document. + // Accumulator only needs to see one document in a group, and when there is a sort order, that + // document must be the first document. kFirstDocument, - // AccumulatorState only needs to see one document in a group, and when there is a sort order, - // that document must be the last document. + // Accumulator only needs to see one document in a group, and when there is a sort order, that + // document must be the last document. kLastDocument, }; -class AccumulatorState : public RefCountable { +class Accumulator : public RefCountable { public: - using Factory = std::function<boost::intrusive_ptr<AccumulatorState>()>; + using Factory = std::function<boost::intrusive_ptr<Accumulator>()>; - AccumulatorState(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {} - - /** Marks the beginning of a new group. The input is the result of evaluating - * AccumulatorExpression::initializer, which can read from the group key. - */ - virtual void startNewGroup(const Value& input) {} + Accumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {} /** Process input and update internal state. * merging should be true when processing outputs from getValue(true). @@ -94,7 +89,7 @@ public: return _memUsageBytes; } - /// Reset this accumulator to a fresh state, ready for a new call to startNewGroup. + /// Reset this accumulator to a fresh state ready to receive input. virtual void reset() = 0; virtual bool isAssociative() const { @@ -114,19 +109,9 @@ public: * * When executing on a sharded cluster, the result of this function will be sent to each * individual shard. - * - * This implementation assumes the accumulator has the simple syntax { <name>: <argument> }, - * such as { $sum: <argument> }. This syntax has no room for an initializer. Subclasses with a - * more elaborate syntax such should override this method. */ - virtual Document serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, - bool explain) const { - ExpressionConstant const* ec = dynamic_cast<ExpressionConstant const*>(initializer.get()); - invariant(ec); - invariant(ec->getValue().nullish()); - - return DOC(getOpName() << argument->serialize(explain)); + virtual Document serialize(boost::intrusive_ptr<Expression> expression, bool explain) const { + return DOC(getOpName() << expression->serialize(explain)); } virtual AccumulatorDocumentsNeeded documentsNeeded() const { @@ -148,7 +133,20 @@ private: boost::intrusive_ptr<ExpressionContext> _expCtx; }; -class AccumulatorAddToSet final : public AccumulatorState { +/** + * A default parser for any accumulator that only takes a single expression as an argument. Returns + * the expression to be evaluated by the accumulator and an Accumulator::Factory. + */ +template <class AccName> +std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> +genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> expCtx, + BSONElement elem, + VariablesParseState vps) { + auto exprValue = Expression::parseOperand(expCtx, elem, vps); + return {exprValue, [expCtx]() { return AccName::create(expCtx); }}; +} + +class AccumulatorAddToSet final : public Accumulator { public: /** * Creates a new $addToSet accumulator. If no memory limit is given, defaults to the value of @@ -162,7 +160,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); bool isAssociative() const final { @@ -178,7 +176,7 @@ private: int _maxMemUsageBytes; }; -class AccumulatorFirst final : public AccumulatorState { +class AccumulatorFirst final : public Accumulator { public: explicit AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -187,7 +185,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); AccumulatorDocumentsNeeded documentsNeeded() const final { @@ -199,7 +197,7 @@ private: Value _first; }; -class AccumulatorLast final : public AccumulatorState { +class AccumulatorLast final : public Accumulator { public: explicit AccumulatorLast(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -208,7 +206,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); AccumulatorDocumentsNeeded documentsNeeded() const final { @@ -219,7 +217,7 @@ private: Value _last; }; -class AccumulatorSum final : public AccumulatorState { +class AccumulatorSum final : public Accumulator { public: explicit AccumulatorSum(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -228,7 +226,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); bool isAssociative() const final { @@ -245,7 +243,7 @@ private: Decimal128 decimalTotal; }; -class AccumulatorMinMax : public AccumulatorState { +class AccumulatorMinMax : public Accumulator { public: enum Sense : int { MIN = 1, @@ -276,7 +274,7 @@ class AccumulatorMax final : public AccumulatorMinMax { public: explicit AccumulatorMax(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorMinMax(expCtx, MAX) {} - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; @@ -284,11 +282,11 @@ class AccumulatorMin final : public AccumulatorMinMax { public: explicit AccumulatorMin(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorMinMax(expCtx, MIN) {} - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; -class AccumulatorPush final : public AccumulatorState { +class AccumulatorPush final : public Accumulator { public: /** * Creates a new $push accumulator. If no memory limit is given, defaults to the value of the @@ -302,7 +300,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); private: @@ -310,7 +308,7 @@ private: int _maxMemUsageBytes; }; -class AccumulatorAvg final : public AccumulatorState { +class AccumulatorAvg final : public Accumulator { public: explicit AccumulatorAvg(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -319,7 +317,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); private: @@ -335,7 +333,7 @@ private: long long _count; }; -class AccumulatorStdDev : public AccumulatorState { +class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp); @@ -355,7 +353,7 @@ class AccumulatorStdDevPop final : public AccumulatorStdDev { public: explicit AccumulatorStdDevPop(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorStdDev(expCtx, false) {} - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; @@ -363,11 +361,11 @@ class AccumulatorStdDevSamp final : public AccumulatorStdDev { public: explicit AccumulatorStdDevSamp(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorStdDev(expCtx, true) {} - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; -class AccumulatorMergeObjects : public AccumulatorState { +class AccumulatorMergeObjects : public Accumulator { public: AccumulatorMergeObjects(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -376,7 +374,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); private: diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp index 7af05eda870..3a6a7b22944 100644 --- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp +++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp @@ -81,7 +81,7 @@ Value AccumulatorAddToSet::getValue(bool toBeMerged) { AccumulatorAddToSet::AccumulatorAddToSet(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<int> maxMemoryUsageBytes) - : AccumulatorState(expCtx), + : Accumulator(expCtx), _set(expCtx->getValueComparator().makeUnorderedValueSet()), _maxMemUsageBytes(maxMemoryUsageBytes.value_or(internalQueryMaxAddToSetBytes.load())) { _memUsageBytes = sizeof(*this); @@ -92,7 +92,7 @@ void AccumulatorAddToSet::reset() { _memUsageBytes = sizeof(*this); } -intrusive_ptr<AccumulatorState> AccumulatorAddToSet::create( +intrusive_ptr<Accumulator> AccumulatorAddToSet::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorAddToSet(expCtx, boost::none); } diff --git a/src/mongo/db/pipeline/accumulator_avg.cpp b/src/mongo/db/pipeline/accumulator_avg.cpp index 1e7f55f5ab3..2efc2cee191 100644 --- a/src/mongo/db/pipeline/accumulator_avg.cpp +++ b/src/mongo/db/pipeline/accumulator_avg.cpp @@ -93,7 +93,7 @@ void AccumulatorAvg::processInternal(const Value& input, bool merging) { _count++; } -intrusive_ptr<AccumulatorState> AccumulatorAvg::create( +intrusive_ptr<Accumulator> AccumulatorAvg::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorAvg(expCtx); } @@ -123,8 +123,8 @@ Value AccumulatorAvg::getValue(bool toBeMerged) { } AccumulatorAvg::AccumulatorAvg(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : AccumulatorState(expCtx), _isDecimal(false), _count(0) { - // This is a fixed size AccumulatorState so we never need to update this + : Accumulator(expCtx), _isDecimal(false), _count(0) { + // This is a fixed size Accumulator so we never need to update this _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_first.cpp b/src/mongo/db/pipeline/accumulator_first.cpp index b48b9e2330e..aed285cf5db 100644 --- a/src/mongo/db/pipeline/accumulator_first.cpp +++ b/src/mongo/db/pipeline/accumulator_first.cpp @@ -59,7 +59,7 @@ Value AccumulatorFirst::getValue(bool toBeMerged) { } AccumulatorFirst::AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : AccumulatorState(expCtx), _haveFirst(false) { + : Accumulator(expCtx), _haveFirst(false) { _memUsageBytes = sizeof(*this); } @@ -70,7 +70,7 @@ void AccumulatorFirst::reset() { } -intrusive_ptr<AccumulatorState> AccumulatorFirst::create( +intrusive_ptr<Accumulator> AccumulatorFirst::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorFirst(expCtx); } diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp index 41183c7ac40..877ac1ca41e 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp +++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp @@ -37,8 +37,10 @@ namespace mongo { REGISTER_ACCUMULATOR(_internalJsReduce, AccumulatorInternalJsReduce::parseInternalJsReduce); -AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce( - boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps) { +std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> +AccumulatorInternalJsReduce::parseInternalJsReduce(boost::intrusive_ptr<ExpressionContext> expCtx, + BSONElement elem, + VariablesParseState vps) { uassert(31326, str::stream() << kAccumulatorName << " requires a document argument, but found " << elem.type(), @@ -46,13 +48,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce( BSONObj obj = elem.embeddedObject(); std::string funcSource; - boost::intrusive_ptr<Expression> argument; + boost::intrusive_ptr<Expression> dataExpr; for (auto&& element : obj) { if (element.fieldNameStringData() == "eval") { funcSource = parseReduceFunction(element); } else if (element.fieldNameStringData() == "data") { - argument = Expression::parseOperand(expCtx, element, vps); + dataExpr = Expression::parseOperand(expCtx, element, vps); } else { uasserted(31243, str::stream() << "Invalid argument specified to " << kAccumulatorName << ": " @@ -66,14 +68,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce( uassert(31349, str::stream() << kAccumulatorName << " requires 'data' argument, recieved input: " << obj.toString(false), - argument); + dataExpr); auto factory = [expCtx, funcSource = funcSource]() { return AccumulatorInternalJsReduce::create(expCtx, funcSource); }; - auto initializer = ExpressionConstant::create(expCtx, Value(BSONNULL)); - return {std::move(initializer), std::move(argument), std::move(factory)}; + return {std::move(dataExpr), std::move(factory)}; } std::string AccumulatorInternalJsReduce::parseReduceFunction(BSONElement func) { @@ -167,7 +168,7 @@ Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) { } } -boost::intrusive_ptr<AccumulatorState> AccumulatorInternalJsReduce::create( +boost::intrusive_ptr<Accumulator> AccumulatorInternalJsReduce::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource) { return make_intrusive<AccumulatorInternalJsReduce>(expCtx, funcSource); @@ -180,233 +181,9 @@ void AccumulatorInternalJsReduce::reset() { } // Returns this accumulator serialized as a Value along with the reduce function. -Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, +Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> expression, bool explain) const { - return DOC(getOpName() << DOC("data" << argument->serialize(explain) << "eval" << _funcSource)); + return DOC( + getOpName() << DOC("data" << expression->serialize(explain) << "eval" << _funcSource)); } - -REGISTER_ACCUMULATOR(accumulator, AccumulatorJs::parse); - -boost::intrusive_ptr<AccumulatorState> AccumulatorJs::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::string init, - std::string accumulate, - std::string merge, - std::string finalize) { - return new AccumulatorJs( - expCtx, std::move(init), std::move(accumulate), std::move(merge), std::move(finalize)); -} - -namespace { -// Parses a constant expression of type String or Code. -std::string parseFunction(StringData fieldName, - boost::intrusive_ptr<ExpressionContext> expCtx, - BSONElement elem, - VariablesParseState vps) { - boost::intrusive_ptr<Expression> expr = Expression::parseOperand(expCtx, elem, vps); - expr = expr->optimize(); - ExpressionConstant* ec = dynamic_cast<ExpressionConstant*>(expr.get()); - uassert(4544701, - str::stream() << "$accumulator '" << fieldName << "' must be a constant expression", - ec); - Value v = ec->getValue(); - uassert(4544702, - str::stream() << "$accumulator '" << fieldName << "' must be a String or Code", - v.getType() == BSONType::String || v.getType() == BSONType::Code); - return v.coerceToString(); -} -} // namespace - - -Document AccumulatorJs::serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, - bool explain) const { - MutableDocument args; - args.addField("init", Value(_init)); - args.addField("initArgs", Value(initializer->serialize(explain))); - args.addField("accumulate", Value(_accumulate)); - args.addField("accumulateArgs", Value(argument->serialize(explain))); - args.addField("merge", Value(_merge)); - args.addField("finalize", Value(_finalize)); - args.addField("lang", Value("js"_sd)); - return DOC(getOpName() << args.freeze()); -} - -AccumulationExpression AccumulatorJs::parse(boost::intrusive_ptr<ExpressionContext> expCtx, - BSONElement elem, - VariablesParseState vps) { - /* - * {$accumulator: { - * init: <code>, - * accumulate: <code>, - * merge: <code>, - * finalize: <code>, - * - * accumulateArgs: <expr>, // evaluated once per document - * - * initArgs: <expr>, // evaluated once per group - * - * lang: 'js', - * }} - */ - uassert(4544703, - str::stream() << "$accumulator expects an object as an argument; found: " - << typeName(elem.type()), - elem.type() == BSONType::Object); - BSONObj obj = elem.embeddedObject(); - - std::string init, accumulate, merge, finalize; - boost::intrusive_ptr<Expression> initArgs, accumulateArgs; - - for (auto&& element : obj) { - auto name = element.fieldNameStringData(); - if (name == "init") { - init = parseFunction("init", expCtx, element, vps); - } else if (name == "accumulate") { - accumulate = parseFunction("accumulate", expCtx, element, vps); - } else if (name == "merge") { - merge = parseFunction("merge", expCtx, element, vps); - } else if (name == "finalize") { - finalize = parseFunction("finalize", expCtx, element, vps); - } else if (name == "initArgs") { - initArgs = Expression::parseOperand(expCtx, element, vps); - } else if (name == "accumulateArgs") { - accumulateArgs = Expression::parseOperand(expCtx, element, vps); - } else if (name == "lang") { - uassert(4544704, - str::stream() << "$accumulator lang must be a string; found: " - << element.type(), - element.type() == BSONType::String); - uassert(4544705, - "$accumulator only supports lang: 'js'", - element.valueStringData() == "js"); - } else { - // unexpected field - uassert( - 4544706, str::stream() << "$accumulator got an unexpected field: " << name, false); - } - } - uassert(4544707, "$accumulator missing required argument 'init'", !init.empty()); - uassert(4544708, "$accumulator missing required argument 'accumulate'", !accumulate.empty()); - uassert(4544709, "$accumulator missing required argument 'merge'", !merge.empty()); - if (finalize.empty()) { - // finalize is optional because many custom accumulators will return the final state - // unchanged. - finalize = "function(state) { return state; }"; - } - if (!initArgs) { - // initArgs is optional because most custom accumulators don't need the state to depend on - // the group key. - initArgs = ExpressionConstant::create(expCtx, Value(BSONArray())); - } - // accumulateArgs is required because it's the only way to communicate a value from the input - // stream into the accumulator state. - uassert(4544710, "$accumulator missing required argument 'accumulateArgs'", accumulateArgs); - - auto factory = [expCtx = expCtx, - init = std::move(init), - accumulate = std::move(accumulate), - merge = std::move(merge), - finalize = std::move(finalize)]() { - return new AccumulatorJs(expCtx, init, accumulate, merge, finalize); - }; - return {std::move(initArgs), std::move(accumulateArgs), std::move(factory)}; -} - -Value AccumulatorJs::getValue(bool toBeMerged) { - // _state is initialized when we encounter the first document in each group. We never create - // empty groups: even in a {$group: {_id: 1, ...}}, we will return zero groups rather than one - // empty group. - invariant(_state); - - // If toBeMerged then we return the current state, to be fed back in to accumulate / merge / - // finalize later. If not toBeMerged then we return the final value, by calling finalize. - if (toBeMerged) { - return *_state; - } - - // Get the final value given the current accumulator state. - - auto& expCtx = getExpressionContext(); - auto jsExec = expCtx->getJsExecWithScope(); - auto func = makeJsFunc(expCtx, _finalize); - - return jsExec->callFunction(func, BSON_ARRAY(*_state), {}); -} - -void AccumulatorJs::startNewGroup(Value const& input) { - // Between groups the _state should be empty: we initialize it to be empty it in the - // constructor, and we clear it at the end of each group (in .reset()). - invariant(!_state); - - auto& expCtx = getExpressionContext(); - auto jsExec = expCtx->getJsExecWithScope(); - auto func = makeJsFunc(expCtx, _init); - - // input is a value produced by our AccumulationExpression::initializer. - uassert(4544711, - str::stream() << "$accumulator initArgs must evaluate to an array: " - << input.toString(), - input.getType() == BSONType::Array); - - size_t index = 0; - BSONArrayBuilder bob; - for (auto&& arg : input.getArray()) { - arg.addToBsonArray(&bob, index++); - } - - _state = jsExec->callFunction(func, bob.arr(), {}); - - recomputeMemUsageBytes(); -} - -void AccumulatorJs::reset() { - _state = std::nullopt; - recomputeMemUsageBytes(); -} - -void AccumulatorJs::processInternal(const Value& input, bool merging) { - // _state should be nonempty because we populate it in startNewGroup. - invariant(_state); - - auto& expCtx = getExpressionContext(); - auto jsExec = expCtx->getJsExecWithScope(); - - if (merging) { - // input is an intermediate state from another instance of this kind of accumulator. Call - // the user's merge function. - auto func = makeJsFunc(expCtx, _merge); - _state = jsExec->callFunction(func, BSON_ARRAY(*_state << input), {}); - recomputeMemUsageBytes(); - } else { - // input is a value produced by our AccumulationExpression::argument. Call the user's - // accumulate function. - auto func = makeJsFunc(expCtx, _accumulate); - uassert(4544712, - str::stream() << "$accumulator accumulateArgs must evaluate to an array: " - << input.toString(), - input.getType() == BSONType::Array); - - size_t index = 0; - BSONArrayBuilder bob; - _state->addToBsonArray(&bob, index++); - for (auto&& arg : input.getArray()) { - arg.addToBsonArray(&bob, index++); - } - - _state = jsExec->callFunction(func, bob.done(), {}); - recomputeMemUsageBytes(); - } -} - -void AccumulatorJs::recomputeMemUsageBytes() { - auto stateSize = _state.value_or(Value{}).getApproximateSize(); - uassert(4544713, - str::stream() << "$accumulator state exceeded max BSON size: " << stateSize, - stateSize <= BSONObjMaxUserSize); - _memUsageBytes = sizeof(*this) + stateSize + _init.capacity() + _accumulate.capacity() + - _merge.capacity() + _finalize.capacity(); -} - } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.h b/src/mongo/db/pipeline/accumulator_js_reduce.h index 1bb15f948e8..bc2725a5567 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.h +++ b/src/mongo/db/pipeline/accumulator_js_reduce.h @@ -38,19 +38,19 @@ namespace mongo { -class AccumulatorInternalJsReduce final : public AccumulatorState { +class AccumulatorInternalJsReduce final : public Accumulator { public: static constexpr auto kAccumulatorName = "$_internalJsReduce"_sd; - static boost::intrusive_ptr<AccumulatorState> create( + static boost::intrusive_ptr<Accumulator> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource); - static AccumulationExpression parseInternalJsReduce( + static std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> parseInternalJsReduce( boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps); AccumulatorInternalJsReduce(const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource) - : AccumulatorState(expCtx), _funcSource(funcSource) { + : Accumulator(expCtx), _funcSource(funcSource) { _memUsageBytes = sizeof(*this); } @@ -64,8 +64,7 @@ public: void reset() final; - virtual Document serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, + virtual Document serialize(boost::intrusive_ptr<Expression> expression, bool explain) const override; private: @@ -76,59 +75,4 @@ private: Value _key; }; -class AccumulatorJs final : public AccumulatorState { -public: - static constexpr auto kAccumulatorName = "$accumulator"_sd; - const char* getOpName() const final { - return kAccumulatorName.rawData(); - } - - // An AccumulatorState instance only owns its "static" arguments: those that don't need to be - // evaluated per input document. - static boost::intrusive_ptr<AccumulatorState> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::string init, - std::string accumulate, - std::string merge, - std::string finalize); - - static AccumulationExpression parse(boost::intrusive_ptr<ExpressionContext> expCtx, - BSONElement elem, - VariablesParseState vps); - - Value getValue(bool toBeMerged) final; - void reset() final; - void processInternal(const Value& input, bool merging) final; - - Document serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, - bool explain) const final; - void startNewGroup(Value const& input) final; - -private: - AccumulatorJs(const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::string init, - std::string accumulate, - std::string merge, - std::string finalize) - : AccumulatorState(expCtx), - _init(init), - _accumulate(accumulate), - _merge(merge), - _finalize(finalize) { - recomputeMemUsageBytes(); - } - void recomputeMemUsageBytes(); - - // static arguments - std::string _init, _accumulate, _merge, _finalize; - - // accumulator state during execution - // - When the accumulator is first created, _state is empty. - // - When the accumulator is fed its first input Value, it runs the user init and accumulate - // functions, and _state gets a Value. - // - When the accumulator is reset, _state becomes empty again. - std::optional<Value> _state; -}; - } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_last.cpp b/src/mongo/db/pipeline/accumulator_last.cpp index 14362e42cab..150360d4fdd 100644 --- a/src/mongo/db/pipeline/accumulator_last.cpp +++ b/src/mongo/db/pipeline/accumulator_last.cpp @@ -55,7 +55,7 @@ Value AccumulatorLast::getValue(bool toBeMerged) { } AccumulatorLast::AccumulatorLast(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : AccumulatorState(expCtx) { + : Accumulator(expCtx) { _memUsageBytes = sizeof(*this); } @@ -64,7 +64,7 @@ void AccumulatorLast::reset() { _last = Value(); } -intrusive_ptr<AccumulatorState> AccumulatorLast::create( +intrusive_ptr<Accumulator> AccumulatorLast::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorLast(expCtx); } diff --git a/src/mongo/db/pipeline/accumulator_merge_objects.cpp b/src/mongo/db/pipeline/accumulator_merge_objects.cpp index 6b23ae528a1..d1e6310ea23 100644 --- a/src/mongo/db/pipeline/accumulator_merge_objects.cpp +++ b/src/mongo/db/pipeline/accumulator_merge_objects.cpp @@ -49,14 +49,14 @@ const char* AccumulatorMergeObjects::getOpName() const { return "$mergeObjects"; } -intrusive_ptr<AccumulatorState> AccumulatorMergeObjects::create( +intrusive_ptr<Accumulator> AccumulatorMergeObjects::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorMergeObjects(expCtx); } AccumulatorMergeObjects::AccumulatorMergeObjects( const boost::intrusive_ptr<ExpressionContext>& expCtx) - : AccumulatorState(expCtx) { + : Accumulator(expCtx) { _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_min_max.cpp b/src/mongo/db/pipeline/accumulator_min_max.cpp index 265a84b7075..c1af0ff8226 100644 --- a/src/mongo/db/pipeline/accumulator_min_max.cpp +++ b/src/mongo/db/pipeline/accumulator_min_max.cpp @@ -71,7 +71,7 @@ Value AccumulatorMinMax::getValue(bool toBeMerged) { AccumulatorMinMax::AccumulatorMinMax(const boost::intrusive_ptr<ExpressionContext>& expCtx, Sense sense) - : AccumulatorState(expCtx), _sense(sense) { + : Accumulator(expCtx), _sense(sense) { _memUsageBytes = sizeof(*this); } @@ -80,12 +80,12 @@ void AccumulatorMinMax::reset() { _memUsageBytes = sizeof(*this); } -intrusive_ptr<AccumulatorState> AccumulatorMin::create( +intrusive_ptr<Accumulator> AccumulatorMin::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorMin(expCtx); } -intrusive_ptr<AccumulatorState> AccumulatorMax::create( +intrusive_ptr<Accumulator> AccumulatorMax::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorMax(expCtx); } diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp index 3a004dd9731..d13a942ef62 100644 --- a/src/mongo/db/pipeline/accumulator_push.cpp +++ b/src/mongo/db/pipeline/accumulator_push.cpp @@ -83,7 +83,7 @@ Value AccumulatorPush::getValue(bool toBeMerged) { AccumulatorPush::AccumulatorPush(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<int> maxMemoryUsageBytes) - : AccumulatorState(expCtx), + : Accumulator(expCtx), _maxMemUsageBytes(maxMemoryUsageBytes.value_or(internalQueryMaxPushBytes.load())) { _memUsageBytes = sizeof(*this); } @@ -93,7 +93,7 @@ void AccumulatorPush::reset() { _memUsageBytes = sizeof(*this); } -intrusive_ptr<AccumulatorState> AccumulatorPush::create( +intrusive_ptr<Accumulator> AccumulatorPush::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorPush(expCtx, boost::none); } diff --git a/src/mongo/db/pipeline/accumulator_std_dev.cpp b/src/mongo/db/pipeline/accumulator_std_dev.cpp index cdd31b2c897..55367d766be 100644 --- a/src/mongo/db/pipeline/accumulator_std_dev.cpp +++ b/src/mongo/db/pipeline/accumulator_std_dev.cpp @@ -96,20 +96,20 @@ Value AccumulatorStdDev::getValue(bool toBeMerged) { } } -intrusive_ptr<AccumulatorState> AccumulatorStdDevSamp::create( +intrusive_ptr<Accumulator> AccumulatorStdDevSamp::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorStdDevSamp(expCtx); } -intrusive_ptr<AccumulatorState> AccumulatorStdDevPop::create( +intrusive_ptr<Accumulator> AccumulatorStdDevPop::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorStdDevPop(expCtx); } AccumulatorStdDev::AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp) - : AccumulatorState(expCtx), _isSamp(isSamp), _count(0), _mean(0), _m2(0) { - // This is a fixed size AccumulatorState so we never need to update this + : Accumulator(expCtx), _isSamp(isSamp), _count(0), _mean(0), _m2(0) { + // This is a fixed size Accumulator so we never need to update this _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_sum.cpp b/src/mongo/db/pipeline/accumulator_sum.cpp index 182f592ce3f..6cd34d8c76f 100644 --- a/src/mongo/db/pipeline/accumulator_sum.cpp +++ b/src/mongo/db/pipeline/accumulator_sum.cpp @@ -85,7 +85,7 @@ void AccumulatorSum::processInternal(const Value& input, bool merging) { } } -intrusive_ptr<AccumulatorState> AccumulatorSum::create( +intrusive_ptr<Accumulator> AccumulatorSum::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { return new AccumulatorSum(expCtx); } @@ -128,8 +128,8 @@ Value AccumulatorSum::getValue(bool toBeMerged) { } AccumulatorSum::AccumulatorSum(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : AccumulatorState(expCtx) { - // This is a fixed size AccumulatorState so we never need to update this. + : Accumulator(expCtx) { + // This is a fixed size Accumulator so we never need to update this. _memUsageBytes = sizeof(*this); } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 724e6a6838a..373a24a4155 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -46,9 +46,9 @@ using std::numeric_limits; using std::string; /** - * Takes the name of an AccumulatorState as its template argument and a list of pairs of arguments - * and expected results as its second argument, and asserts that for the given AccumulatorState the - * arguments evaluate to the expected results. + * Takes the name of an Accumulator as its template argument and a list of pairs of arguments and + * expected results as its second argument, and asserts that for the given Accumulator the arguments + * evaluate to the expected results. */ template <typename AccName> static void assertExpectedResults( diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index c6decbca69c..eba38c5c58b 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -109,25 +109,13 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::doGetNext() { return makeDocument(*(_bucketsIterator++)); } -boost::intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::optimize() { - _groupByExpression = _groupByExpression->optimize(); - for (auto&& accumulatedField : _accumulatedFields) { - accumulatedField.expr.argument = accumulatedField.expr.argument->optimize(); - accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize(); - } - return this; -} - DepsTracker::State DocumentSourceBucketAuto::getDependencies(DepsTracker* deps) const { // Add the 'groupBy' expression. _groupByExpression->addDependencies(deps); // Add the 'output' fields. for (auto&& accumulatedField : _accumulatedFields) { - // Anything the per-doc expression depends on, the whole stage depends on. - accumulatedField.expr.argument->addDependencies(deps); - // The initializer should be an ExpressionConstant, or something that optimizes to one. - // ExpressionConstant doesn't have dependencies. + accumulatedField.expression->addDependencies(deps); } // We know exactly which fields will be present in the output document. Future stages cannot @@ -201,8 +189,7 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>& const size_t numAccumulators = _accumulatedFields.size(); for (size_t k = 0; k < numAccumulators; k++) { bucket._accums[k]->process( - _accumulatedFields[k].expr.argument->evaluate(entry.second, &pExpCtx->variables), - false); + _accumulatedFields[k].expression->evaluate(entry.second, &pExpCtx->variables), false); } } @@ -247,16 +234,6 @@ void DocumentSourceBucketAuto::populateBuckets() { // Initialize the current bucket. Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields); - // Evaluate each initializer against an empty document. Normally the - // initializer can refer to the group key, but in $bucketAuto there is no single - // group key per bucket. - Document emptyDoc; - for (size_t k = 0; k < _accumulatedFields.size(); ++k) { - Value initializerValue = - _accumulatedFields[k].expr.initializer->evaluate(emptyDoc, &pExpCtx->variables); - currentBucket._accums[k]->startNewGroup(initializerValue); - } - // Add the first value into the current bucket. addDocumentToBucket(currentValue, currentBucket); @@ -405,11 +382,10 @@ Value DocumentSourceBucketAuto::serialize( MutableDocument outputSpec(_accumulatedFields.size()); for (auto&& accumulatedField : _accumulatedFields) { - intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator(); + intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(); outputSpec[accumulatedField.fieldName] = - Value(accum->serialize(accumulatedField.expr.initializer, - accumulatedField.expr.argument, - static_cast<bool>(explain))); + Value{Document{{accum->getOpName(), + accumulatedField.expression->serialize(static_cast<bool>(explain))}}}; } insides["output"] = outputSpec.freezeToValue(); @@ -429,11 +405,9 @@ intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create( numBuckets > 0); // If there is no output field specified, then add the default one. if (accumulationStatements.empty()) { - accumulationStatements.emplace_back( - "count", - AccumulationExpression(ExpressionConstant::create(pExpCtx, Value(BSONNULL)), - ExpressionConstant::create(pExpCtx, Value(1)), - [pExpCtx] { return AccumulatorSum::create(pExpCtx); })); + accumulationStatements.emplace_back("count", + ExpressionConstant::create(pExpCtx, Value(1)), + [pExpCtx] { return AccumulatorSum::create(pExpCtx); }); } return new DocumentSourceBucketAuto(pExpCtx, groupByExpression, @@ -512,13 +486,8 @@ intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( argument.type() == BSONType::Object); for (auto&& outputField : argument.embeddedObject()) { - auto stmt = - AccumulationStatement::parseAccumulationStatement(pExpCtx, outputField, vps); - stmt.expr.initializer = stmt.expr.initializer->optimize(); - uassert(4544714, - "Can't refer to the group key in $bucketAuto", - ExpressionConstant::isNullOrConstant(stmt.expr.initializer)); - accumulationStatements.push_back(std::move(stmt)); + accumulationStatements.push_back( + AccumulationStatement::parseAccumulationStatement(pExpCtx, outputField, vps)); } } else if ("granularity" == argName) { uassert(40261, diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index fcc35a130d3..8804b0df6c1 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -49,7 +49,6 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; const char* getSourceName() const final; - boost::intrusive_ptr<DocumentSource> optimize() final; StageConstraints constraints(Pipeline::SplitState pipeState) const final { return {StreamType::kBlocking, @@ -115,7 +114,7 @@ private: const std::vector<AccumulationStatement>& accumulationStatements); Value _min; Value _max; - std::vector<boost::intrusive_ptr<AccumulatorState>> _accums; + std::vector<boost::intrusive_ptr<Accumulator>> _accums; }; /** diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 8268f0a1dd1..0f961f8c363 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -158,17 +158,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() { _currentId = _firstPartOfNextGroup.first; const size_t numAccumulators = _accumulatedFields.size(); - - // Call startNewGroup on every accumulator. - Value expandedId = expandId(_currentId); - Document idDoc = - expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); - for (size_t i = 0; i < numAccumulators; ++i) { - Value initializerValue = - _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables); - _currentAccumulators[i]->startNewGroup(initializerValue); - } - while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) { // Inside of this loop, _firstPartOfNextGroup is the current data being processed. // At loop exit, it is the first value to be processed in the next group. @@ -227,8 +216,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { } for (auto&& accumulatedField : _accumulatedFields) { - accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize(); - accumulatedField.expr.argument = accumulatedField.expr.argument->optimize(); + accumulatedField.expression = accumulatedField.expression->optimize(); } return this; @@ -253,11 +241,9 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> // Add the remaining fields. for (auto&& accumulatedField : _accumulatedFields) { - intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator(); + intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(); insides[accumulatedField.fieldName] = - Value(accum->serialize(accumulatedField.expr.initializer, - accumulatedField.expr.argument, - static_cast<bool>(explain))); + Value(accum->serialize(accumulatedField.expression, static_cast<bool>(explain))); } if (_doingMerge) { @@ -277,8 +263,7 @@ DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const // add the rest for (auto&& accumulatedField : _accumulatedFields) { - accumulatedField.expr.argument->addDependencies(deps); - // Don't add initializer, because it doesn't refer to docs from the input stream. + accumulatedField.expression->addDependencies(deps); } return DepsTracker::State::EXHAUSTIVE_ALL; @@ -500,23 +485,16 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and // looking it up in '_groups' multiple times. const size_t oldSize = _groups->size(); - vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id]; + vector<intrusive_ptr<Accumulator>>& group = (*_groups)[id]; const bool inserted = _groups->size() != oldSize; if (inserted) { _memoryUsageBytes += id.getApproximateSize(); - // Initialize and add the accumulators - Value expandedId = expandId(id); - Document idDoc = - expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); + // Add the accumulators group.reserve(numAccumulators); for (auto&& accumulatedField : _accumulatedFields) { - auto accum = accumulatedField.makeAccumulator(); - Value initializerValue = - accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables); - accum->startNewGroup(initializerValue); - group.push_back(accum); + group.push_back(accumulatedField.makeAccumulator()); } } else { for (auto&& groupObj : group) { @@ -530,7 +508,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { for (size_t i = 0; i < numAccumulators; i++) { group[i]->process( - _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables), + _accumulatedFields[i].expression->evaluate(rootDocument, &pExpCtx->variables), _doingMerge); _memoryUsageBytes += group[i]->memUsageForSorter(); @@ -715,7 +693,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distr // original accumulator may be collecting an expression based on a field expression or // constant. Here, we accumulate the output of the same name from the prior group. auto copiedAccumulatedField = accumulatedField; - copiedAccumulatedField.expr.argument = + copiedAccumulatedField.expression = ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + copiedAccumulatedField.fieldName, vps); mergingGroup->addAccumulator(copiedAccumulatedField); } @@ -797,10 +775,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { fields.push_back(std::make_pair("_id", ExpressionFieldPath::create(pExpCtx, groupId))); for (auto&& accumulator : _accumulatedFields) { - fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument)); - - // Since we don't attempt this transformation for non-$first accumulators, - // the initializer should always be trivial. + fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expression)); } return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields)); diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 5d885e84a1f..88b0a669b6e 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -88,7 +88,7 @@ private: class DocumentSourceGroup final : public DocumentSource { public: - using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>; + using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>; using GroupsMap = ValueUnorderedMap<Accumulators>; static constexpr StringData kStageName = "$group"_sd; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index c3c764c4f6a..a2ed4358248 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -75,8 +75,9 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) { // This is the only way to do this in a debug build. auto&& parser = AccumulationStatement::getParser("$sum"); auto accumulatorArg = BSON("" << 1); - auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); - AccumulationStatement countStatement{"count", accExpr}; + auto [expression, factory] = + parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); + AccumulationStatement countStatement{"count", expression, factory}; auto group = DocumentSourceGroup::create( expCtx, ExpressionConstant::create(expCtx, Value(BSONNULL)), {countStatement}); auto mock = @@ -112,8 +113,9 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) { auto&& parser = AccumulationStatement::getParser("$push"); auto accumulatorArg = BSON("" << "$largeStr"); - auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); - AccumulationStatement pushStatement{"spaceHog", accExpr}; + auto [expression, factory] = + parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); + AccumulationStatement pushStatement{"spaceHog", expression, factory}; auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState); auto group = DocumentSourceGroup::create( @@ -154,8 +156,9 @@ TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSet auto&& parser = AccumulationStatement::getParser("$push"); auto accumulatorArg = BSON("" << "$largeStr"); - auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); - AccumulationStatement pushStatement{"spaceHog", accExpr}; + auto [expression, factory] = + parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); + AccumulationStatement pushStatement{"spaceHog", expression, factory}; auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState); auto group = DocumentSourceGroup::create( @@ -178,8 +181,9 @@ TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { auto&& parser = AccumulationStatement::getParser("$push"); auto accumulatorArg = BSON("" << "$largeStr"); - auto accExpr = parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); - AccumulationStatement pushStatement{"spaceHog", accExpr}; + auto [expression, factory] = + parser(expCtx, accumulatorArg.firstElement(), expCtx->variablesParseState); + AccumulationStatement pushStatement{"spaceHog", expression, factory}; auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState); auto group = DocumentSourceGroup::create( diff --git a/src/mongo/db/pipeline/expression.h b/src/mongo/db/pipeline/expression.h index 4f772ca2363..b05b4dcd918 100644 --- a/src/mongo/db/pipeline/expression.h +++ b/src/mongo/db/pipeline/expression.h @@ -399,15 +399,15 @@ public: * Used to make Accumulators available as Expressions, e.g., to make $sum available as an Expression * use "REGISTER_EXPRESSION(sum, ExpressionAccumulator<AccumulatorSum>::parse);". */ -template <typename AccumulatorState> +template <typename Accumulator> class ExpressionFromAccumulator - : public ExpressionVariadic<ExpressionFromAccumulator<AccumulatorState>> { + : public ExpressionVariadic<ExpressionFromAccumulator<Accumulator>> { public: explicit ExpressionFromAccumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) - : ExpressionVariadic<ExpressionFromAccumulator<AccumulatorState>>(expCtx) {} + : ExpressionVariadic<ExpressionFromAccumulator<Accumulator>>(expCtx) {} Value evaluate(const Document& root, Variables* variables) const final { - AccumulatorState accum(this->getExpressionContext()); + Accumulator accum(this->getExpressionContext()); const auto n = this->_children.size(); // If a single array arg is given, loop through it passing each member to the accumulator. // If a single, non-array arg is given, pass it directly to the accumulator. @@ -435,15 +435,15 @@ public: if (this->_children.size() == 1) { return false; } - return AccumulatorState(this->getExpressionContext()).isAssociative(); + return Accumulator(this->getExpressionContext()).isAssociative(); } bool isCommutative() const final { - return AccumulatorState(this->getExpressionContext()).isCommutative(); + return Accumulator(this->getExpressionContext()).isCommutative(); } const char* getOpName() const final { - return AccumulatorState(this->getExpressionContext()).getOpName(); + return Accumulator(this->getExpressionContext()).getOpName(); } void acceptVisitor(ExpressionVisitor* visitor) final { diff --git a/src/mongo/db/pipeline/expression_visitor.h b/src/mongo/db/pipeline/expression_visitor.h index 393778e5f21..326c8e2e6ce 100644 --- a/src/mongo/db/pipeline/expression_visitor.h +++ b/src/mongo/db/pipeline/expression_visitor.h @@ -158,7 +158,7 @@ class AccumulatorStdDevPop; class AccumulatorStdDevSamp; class AccumulatorSum; class AccumulatorMergeObjects; -template <typename AccumulatorState> +template <typename Accumulator> class ExpressionFromAccumulator; /** |