summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/accumulator.h
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2020-01-17 16:20:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 20:44:41 +0000
commit606fbf8eac896b0b4ed26e921b7f6bf1f73f5511 (patch)
tree4855ab6890e429ff79ffdf867d2b973361b62b00 /src/mongo/db/pipeline/accumulator.h
parent5e57c0b0f7505035c37179d100fdd43ef2b6cc36 (diff)
downloadmongo-606fbf8eac896b0b4ed26e921b7f6bf1f73f5511.tar.gz
SERVER-45447 Add $accumulator for user-defined Javascript accumulators
Diffstat (limited to 'src/mongo/db/pipeline/accumulator.h')
-rw-r--r--src/mongo/db/pipeline/accumulator.h90
1 files changed, 46 insertions, 44 deletions
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index cb58bc7a3b1..e59ccb543da 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -51,23 +51,28 @@ namespace mongo {
* This enum indicates which documents an accumulator needs to see in order to compute its output.
*/
enum class AccumulatorDocumentsNeeded {
- // Accumulator needs to see all documents in a group.
+ // AccumulatorState needs to see all documents in a group.
kAllDocuments,
- // Accumulator only needs to see one document in a group, and when there is a sort order, that
- // document must be the first document.
+ // AccumulatorState only needs to see one document in a group, and when there is a sort order,
+ // that document must be the first document.
kFirstDocument,
- // Accumulator only needs to see one document in a group, and when there is a sort order, that
- // document must be the last document.
+ // AccumulatorState only needs to see one document in a group, and when there is a sort order,
+ // that document must be the last document.
kLastDocument,
};
-class Accumulator : public RefCountable {
+class AccumulatorState : public RefCountable {
public:
- using Factory = std::function<boost::intrusive_ptr<Accumulator>()>;
+ using Factory = std::function<boost::intrusive_ptr<AccumulatorState>()>;
- Accumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {}
+ AccumulatorState(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {}
+
+ /** Marks the beginning of a new group. The input is the result of evaluating
+ * AccumulatorExpression::initializer, which can read from the group key.
+ */
+ virtual void startNewGroup(const Value& input) {}
/** Process input and update internal state.
* merging should be true when processing outputs from getValue(true).
@@ -89,7 +94,7 @@ public:
return _memUsageBytes;
}
- /// Reset this accumulator to a fresh state ready to receive input.
+ /// Reset this accumulator to a fresh state, ready for a new call to startNewGroup.
virtual void reset() = 0;
virtual bool isAssociative() const {
@@ -109,9 +114,19 @@ public:
*
* When executing on a sharded cluster, the result of this function will be sent to each
* individual shard.
+ *
+ * This implementation assumes the accumulator has the simple syntax { <name>: <argument> },
+ * such as { $sum: <argument> }. This syntax has no room for an initializer. Subclasses with a
+ * more elaborate syntax such should override this method.
*/
- virtual Document serialize(boost::intrusive_ptr<Expression> expression, bool explain) const {
- return DOC(getOpName() << expression->serialize(explain));
+ virtual Document serialize(boost::intrusive_ptr<Expression> initializer,
+ boost::intrusive_ptr<Expression> argument,
+ bool explain) const {
+ ExpressionConstant const* ec = dynamic_cast<ExpressionConstant const*>(initializer.get());
+ invariant(ec);
+ invariant(ec->getValue().nullish());
+
+ return DOC(getOpName() << argument->serialize(explain));
}
virtual AccumulatorDocumentsNeeded documentsNeeded() const {
@@ -133,20 +148,7 @@ private:
boost::intrusive_ptr<ExpressionContext> _expCtx;
};
-/**
- * A default parser for any accumulator that only takes a single expression as an argument. Returns
- * the expression to be evaluated by the accumulator and an Accumulator::Factory.
- */
-template <class AccName>
-std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>
-genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> expCtx,
- BSONElement elem,
- VariablesParseState vps) {
- auto exprValue = Expression::parseOperand(expCtx, elem, vps);
- return {exprValue, [expCtx]() { return AccName::create(expCtx); }};
-}
-
-class AccumulatorAddToSet final : public Accumulator {
+class AccumulatorAddToSet final : public AccumulatorState {
public:
/**
* Creates a new $addToSet accumulator. If no memory limit is given, defaults to the value of
@@ -160,7 +162,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
bool isAssociative() const final {
@@ -176,7 +178,7 @@ private:
int _maxMemUsageBytes;
};
-class AccumulatorFirst final : public Accumulator {
+class AccumulatorFirst final : public AccumulatorState {
public:
explicit AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -185,7 +187,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
AccumulatorDocumentsNeeded documentsNeeded() const final {
@@ -197,7 +199,7 @@ private:
Value _first;
};
-class AccumulatorLast final : public Accumulator {
+class AccumulatorLast final : public AccumulatorState {
public:
explicit AccumulatorLast(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -206,7 +208,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
AccumulatorDocumentsNeeded documentsNeeded() const final {
@@ -217,7 +219,7 @@ private:
Value _last;
};
-class AccumulatorSum final : public Accumulator {
+class AccumulatorSum final : public AccumulatorState {
public:
explicit AccumulatorSum(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -226,7 +228,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
bool isAssociative() const final {
@@ -243,7 +245,7 @@ private:
Decimal128 decimalTotal;
};
-class AccumulatorMinMax : public Accumulator {
+class AccumulatorMinMax : public AccumulatorState {
public:
enum Sense : int {
MIN = 1,
@@ -274,7 +276,7 @@ class AccumulatorMax final : public AccumulatorMinMax {
public:
explicit AccumulatorMax(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorMinMax(expCtx, MAX) {}
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
@@ -282,11 +284,11 @@ class AccumulatorMin final : public AccumulatorMinMax {
public:
explicit AccumulatorMin(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorMinMax(expCtx, MIN) {}
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
-class AccumulatorPush final : public Accumulator {
+class AccumulatorPush final : public AccumulatorState {
public:
/**
* Creates a new $push accumulator. If no memory limit is given, defaults to the value of the
@@ -300,7 +302,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
@@ -308,7 +310,7 @@ private:
int _maxMemUsageBytes;
};
-class AccumulatorAvg final : public Accumulator {
+class AccumulatorAvg final : public AccumulatorState {
public:
explicit AccumulatorAvg(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -317,7 +319,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
@@ -333,7 +335,7 @@ private:
long long _count;
};
-class AccumulatorStdDev : public Accumulator {
+class AccumulatorStdDev : public AccumulatorState {
public:
AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp);
@@ -353,7 +355,7 @@ class AccumulatorStdDevPop final : public AccumulatorStdDev {
public:
explicit AccumulatorStdDevPop(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorStdDev(expCtx, false) {}
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
@@ -361,11 +363,11 @@ class AccumulatorStdDevSamp final : public AccumulatorStdDev {
public:
explicit AccumulatorStdDevSamp(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: AccumulatorStdDev(expCtx, true) {}
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
};
-class AccumulatorMergeObjects : public Accumulator {
+class AccumulatorMergeObjects : public AccumulatorState {
public:
AccumulatorMergeObjects(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -374,7 +376,7 @@ public:
const char* getOpName() const final;
void reset() final;
- static boost::intrusive_ptr<Accumulator> create(
+ static boost::intrusive_ptr<AccumulatorState> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx);
private: