diff options
Diffstat (limited to 'src/mongo/db/pipeline/accumulator.h')
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 90 |
1 files changed, 46 insertions, 44 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index cb58bc7a3b1..e59ccb543da 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -51,23 +51,28 @@ namespace mongo { * This enum indicates which documents an accumulator needs to see in order to compute its output. */ enum class AccumulatorDocumentsNeeded { - // Accumulator needs to see all documents in a group. + // AccumulatorState needs to see all documents in a group. kAllDocuments, - // Accumulator only needs to see one document in a group, and when there is a sort order, that - // document must be the first document. + // AccumulatorState only needs to see one document in a group, and when there is a sort order, + // that document must be the first document. kFirstDocument, - // Accumulator only needs to see one document in a group, and when there is a sort order, that - // document must be the last document. + // AccumulatorState only needs to see one document in a group, and when there is a sort order, + // that document must be the last document. kLastDocument, }; -class Accumulator : public RefCountable { +class AccumulatorState : public RefCountable { public: - using Factory = std::function<boost::intrusive_ptr<Accumulator>()>; + using Factory = std::function<boost::intrusive_ptr<AccumulatorState>()>; - Accumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {} + AccumulatorState(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {} + + /** Marks the beginning of a new group. The input is the result of evaluating + * AccumulatorExpression::initializer, which can read from the group key. + */ + virtual void startNewGroup(const Value& input) {} /** Process input and update internal state. * merging should be true when processing outputs from getValue(true). @@ -89,7 +94,7 @@ public: return _memUsageBytes; } - /// Reset this accumulator to a fresh state ready to receive input. + /// Reset this accumulator to a fresh state, ready for a new call to startNewGroup. virtual void reset() = 0; virtual bool isAssociative() const { @@ -109,9 +114,19 @@ public: * * When executing on a sharded cluster, the result of this function will be sent to each * individual shard. + * + * This implementation assumes the accumulator has the simple syntax { <name>: <argument> }, + * such as { $sum: <argument> }. This syntax has no room for an initializer. Subclasses with a + * more elaborate syntax such should override this method. */ - virtual Document serialize(boost::intrusive_ptr<Expression> expression, bool explain) const { - return DOC(getOpName() << expression->serialize(explain)); + virtual Document serialize(boost::intrusive_ptr<Expression> initializer, + boost::intrusive_ptr<Expression> argument, + bool explain) const { + ExpressionConstant const* ec = dynamic_cast<ExpressionConstant const*>(initializer.get()); + invariant(ec); + invariant(ec->getValue().nullish()); + + return DOC(getOpName() << argument->serialize(explain)); } virtual AccumulatorDocumentsNeeded documentsNeeded() const { @@ -133,20 +148,7 @@ private: boost::intrusive_ptr<ExpressionContext> _expCtx; }; -/** - * A default parser for any accumulator that only takes a single expression as an argument. Returns - * the expression to be evaluated by the accumulator and an Accumulator::Factory. - */ -template <class AccName> -std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> -genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> expCtx, - BSONElement elem, - VariablesParseState vps) { - auto exprValue = Expression::parseOperand(expCtx, elem, vps); - return {exprValue, [expCtx]() { return AccName::create(expCtx); }}; -} - -class AccumulatorAddToSet final : public Accumulator { +class AccumulatorAddToSet final : public AccumulatorState { public: /** * Creates a new $addToSet accumulator. If no memory limit is given, defaults to the value of @@ -160,7 +162,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); bool isAssociative() const final { @@ -176,7 +178,7 @@ private: int _maxMemUsageBytes; }; -class AccumulatorFirst final : public Accumulator { +class AccumulatorFirst final : public AccumulatorState { public: explicit AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -185,7 +187,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); AccumulatorDocumentsNeeded documentsNeeded() const final { @@ -197,7 +199,7 @@ private: Value _first; }; -class AccumulatorLast final : public Accumulator { +class AccumulatorLast final : public AccumulatorState { public: explicit AccumulatorLast(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -206,7 +208,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); AccumulatorDocumentsNeeded documentsNeeded() const final { @@ -217,7 +219,7 @@ private: Value _last; }; -class AccumulatorSum final : public Accumulator { +class AccumulatorSum final : public AccumulatorState { public: explicit AccumulatorSum(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -226,7 +228,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); bool isAssociative() const final { @@ -243,7 +245,7 @@ private: Decimal128 decimalTotal; }; -class AccumulatorMinMax : public Accumulator { +class AccumulatorMinMax : public AccumulatorState { public: enum Sense : int { MIN = 1, @@ -274,7 +276,7 @@ class AccumulatorMax final : public AccumulatorMinMax { public: explicit AccumulatorMax(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorMinMax(expCtx, MAX) {} - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; @@ -282,11 +284,11 @@ class AccumulatorMin final : public AccumulatorMinMax { public: explicit AccumulatorMin(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorMinMax(expCtx, MIN) {} - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; -class AccumulatorPush final : public Accumulator { +class AccumulatorPush final : public AccumulatorState { public: /** * Creates a new $push accumulator. If no memory limit is given, defaults to the value of the @@ -300,7 +302,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); private: @@ -308,7 +310,7 @@ private: int _maxMemUsageBytes; }; -class AccumulatorAvg final : public Accumulator { +class AccumulatorAvg final : public AccumulatorState { public: explicit AccumulatorAvg(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -317,7 +319,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); private: @@ -333,7 +335,7 @@ private: long long _count; }; -class AccumulatorStdDev : public Accumulator { +class AccumulatorStdDev : public AccumulatorState { public: AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp); @@ -353,7 +355,7 @@ class AccumulatorStdDevPop final : public AccumulatorStdDev { public: explicit AccumulatorStdDevPop(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorStdDev(expCtx, false) {} - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; @@ -361,11 +363,11 @@ class AccumulatorStdDevSamp final : public AccumulatorStdDev { public: explicit AccumulatorStdDevSamp(const boost::intrusive_ptr<ExpressionContext>& expCtx) : AccumulatorStdDev(expCtx, true) {} - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); }; -class AccumulatorMergeObjects : public Accumulator { +class AccumulatorMergeObjects : public AccumulatorState { public: AccumulatorMergeObjects(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -374,7 +376,7 @@ public: const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr<Accumulator> create( + static boost::intrusive_ptr<AccumulatorState> create( const boost::intrusive_ptr<ExpressionContext>& expCtx); private: |