summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/accumulator_js_reduce.cpp
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2020-02-26 16:17:59 +0000
committerevergreen <evergreen@mongodb.com>2020-02-26 16:17:59 +0000
commit490c13c0620017dfca4f3e53e0574fef064784e7 (patch)
tree8c7eba870df9712a4d68300953c933b7970fe12d /src/mongo/db/pipeline/accumulator_js_reduce.cpp
parentce4f6d6f6e0be8478e1f2f6f728dcc89f4b1e271 (diff)
downloadmongo-490c13c0620017dfca4f3e53e0574fef064784e7.tar.gz
Revert "SERVER-45447 Add $accumulator for user-defined Javascript accumulators"
This reverts commit 5b50a111c9361554bc7dbe6a8c63c885a5c29df6.
Diffstat (limited to 'src/mongo/db/pipeline/accumulator_js_reduce.cpp')
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.cpp247
1 files changed, 12 insertions, 235 deletions
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
index 41183c7ac40..877ac1ca41e 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
@@ -37,8 +37,10 @@ namespace mongo {
REGISTER_ACCUMULATOR(_internalJsReduce, AccumulatorInternalJsReduce::parseInternalJsReduce);
-AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce(
- boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps) {
+std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>
+AccumulatorInternalJsReduce::parseInternalJsReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
+ BSONElement elem,
+ VariablesParseState vps) {
uassert(31326,
str::stream() << kAccumulatorName << " requires a document argument, but found "
<< elem.type(),
@@ -46,13 +48,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce(
BSONObj obj = elem.embeddedObject();
std::string funcSource;
- boost::intrusive_ptr<Expression> argument;
+ boost::intrusive_ptr<Expression> dataExpr;
for (auto&& element : obj) {
if (element.fieldNameStringData() == "eval") {
funcSource = parseReduceFunction(element);
} else if (element.fieldNameStringData() == "data") {
- argument = Expression::parseOperand(expCtx, element, vps);
+ dataExpr = Expression::parseOperand(expCtx, element, vps);
} else {
uasserted(31243,
str::stream() << "Invalid argument specified to " << kAccumulatorName << ": "
@@ -66,14 +68,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce(
uassert(31349,
str::stream() << kAccumulatorName
<< " requires 'data' argument, recieved input: " << obj.toString(false),
- argument);
+ dataExpr);
auto factory = [expCtx, funcSource = funcSource]() {
return AccumulatorInternalJsReduce::create(expCtx, funcSource);
};
- auto initializer = ExpressionConstant::create(expCtx, Value(BSONNULL));
- return {std::move(initializer), std::move(argument), std::move(factory)};
+ return {std::move(dataExpr), std::move(factory)};
}
std::string AccumulatorInternalJsReduce::parseReduceFunction(BSONElement func) {
@@ -167,7 +168,7 @@ Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) {
}
}
-boost::intrusive_ptr<AccumulatorState> AccumulatorInternalJsReduce::create(
+boost::intrusive_ptr<Accumulator> AccumulatorInternalJsReduce::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource) {
return make_intrusive<AccumulatorInternalJsReduce>(expCtx, funcSource);
@@ -180,233 +181,9 @@ void AccumulatorInternalJsReduce::reset() {
}
// Returns this accumulator serialized as a Value along with the reduce function.
-Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
+Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> expression,
bool explain) const {
- return DOC(getOpName() << DOC("data" << argument->serialize(explain) << "eval" << _funcSource));
+ return DOC(
+ getOpName() << DOC("data" << expression->serialize(explain) << "eval" << _funcSource));
}
-
-REGISTER_ACCUMULATOR(accumulator, AccumulatorJs::parse);
-
-boost::intrusive_ptr<AccumulatorState> AccumulatorJs::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- std::string init,
- std::string accumulate,
- std::string merge,
- std::string finalize) {
- return new AccumulatorJs(
- expCtx, std::move(init), std::move(accumulate), std::move(merge), std::move(finalize));
-}
-
-namespace {
-// Parses a constant expression of type String or Code.
-std::string parseFunction(StringData fieldName,
- boost::intrusive_ptr<ExpressionContext> expCtx,
- BSONElement elem,
- VariablesParseState vps) {
- boost::intrusive_ptr<Expression> expr = Expression::parseOperand(expCtx, elem, vps);
- expr = expr->optimize();
- ExpressionConstant* ec = dynamic_cast<ExpressionConstant*>(expr.get());
- uassert(4544701,
- str::stream() << "$accumulator '" << fieldName << "' must be a constant expression",
- ec);
- Value v = ec->getValue();
- uassert(4544702,
- str::stream() << "$accumulator '" << fieldName << "' must be a String or Code",
- v.getType() == BSONType::String || v.getType() == BSONType::Code);
- return v.coerceToString();
-}
-} // namespace
-
-
-Document AccumulatorJs::serialize(boost::intrusive_ptr<Expression> initializer,
- boost::intrusive_ptr<Expression> argument,
- bool explain) const {
- MutableDocument args;
- args.addField("init", Value(_init));
- args.addField("initArgs", Value(initializer->serialize(explain)));
- args.addField("accumulate", Value(_accumulate));
- args.addField("accumulateArgs", Value(argument->serialize(explain)));
- args.addField("merge", Value(_merge));
- args.addField("finalize", Value(_finalize));
- args.addField("lang", Value("js"_sd));
- return DOC(getOpName() << args.freeze());
-}
-
-AccumulationExpression AccumulatorJs::parse(boost::intrusive_ptr<ExpressionContext> expCtx,
- BSONElement elem,
- VariablesParseState vps) {
- /*
- * {$accumulator: {
- * init: <code>,
- * accumulate: <code>,
- * merge: <code>,
- * finalize: <code>,
- *
- * accumulateArgs: <expr>, // evaluated once per document
- *
- * initArgs: <expr>, // evaluated once per group
- *
- * lang: 'js',
- * }}
- */
- uassert(4544703,
- str::stream() << "$accumulator expects an object as an argument; found: "
- << typeName(elem.type()),
- elem.type() == BSONType::Object);
- BSONObj obj = elem.embeddedObject();
-
- std::string init, accumulate, merge, finalize;
- boost::intrusive_ptr<Expression> initArgs, accumulateArgs;
-
- for (auto&& element : obj) {
- auto name = element.fieldNameStringData();
- if (name == "init") {
- init = parseFunction("init", expCtx, element, vps);
- } else if (name == "accumulate") {
- accumulate = parseFunction("accumulate", expCtx, element, vps);
- } else if (name == "merge") {
- merge = parseFunction("merge", expCtx, element, vps);
- } else if (name == "finalize") {
- finalize = parseFunction("finalize", expCtx, element, vps);
- } else if (name == "initArgs") {
- initArgs = Expression::parseOperand(expCtx, element, vps);
- } else if (name == "accumulateArgs") {
- accumulateArgs = Expression::parseOperand(expCtx, element, vps);
- } else if (name == "lang") {
- uassert(4544704,
- str::stream() << "$accumulator lang must be a string; found: "
- << element.type(),
- element.type() == BSONType::String);
- uassert(4544705,
- "$accumulator only supports lang: 'js'",
- element.valueStringData() == "js");
- } else {
- // unexpected field
- uassert(
- 4544706, str::stream() << "$accumulator got an unexpected field: " << name, false);
- }
- }
- uassert(4544707, "$accumulator missing required argument 'init'", !init.empty());
- uassert(4544708, "$accumulator missing required argument 'accumulate'", !accumulate.empty());
- uassert(4544709, "$accumulator missing required argument 'merge'", !merge.empty());
- if (finalize.empty()) {
- // finalize is optional because many custom accumulators will return the final state
- // unchanged.
- finalize = "function(state) { return state; }";
- }
- if (!initArgs) {
- // initArgs is optional because most custom accumulators don't need the state to depend on
- // the group key.
- initArgs = ExpressionConstant::create(expCtx, Value(BSONArray()));
- }
- // accumulateArgs is required because it's the only way to communicate a value from the input
- // stream into the accumulator state.
- uassert(4544710, "$accumulator missing required argument 'accumulateArgs'", accumulateArgs);
-
- auto factory = [expCtx = expCtx,
- init = std::move(init),
- accumulate = std::move(accumulate),
- merge = std::move(merge),
- finalize = std::move(finalize)]() {
- return new AccumulatorJs(expCtx, init, accumulate, merge, finalize);
- };
- return {std::move(initArgs), std::move(accumulateArgs), std::move(factory)};
-}
-
-Value AccumulatorJs::getValue(bool toBeMerged) {
- // _state is initialized when we encounter the first document in each group. We never create
- // empty groups: even in a {$group: {_id: 1, ...}}, we will return zero groups rather than one
- // empty group.
- invariant(_state);
-
- // If toBeMerged then we return the current state, to be fed back in to accumulate / merge /
- // finalize later. If not toBeMerged then we return the final value, by calling finalize.
- if (toBeMerged) {
- return *_state;
- }
-
- // Get the final value given the current accumulator state.
-
- auto& expCtx = getExpressionContext();
- auto jsExec = expCtx->getJsExecWithScope();
- auto func = makeJsFunc(expCtx, _finalize);
-
- return jsExec->callFunction(func, BSON_ARRAY(*_state), {});
-}
-
-void AccumulatorJs::startNewGroup(Value const& input) {
- // Between groups the _state should be empty: we initialize it to be empty it in the
- // constructor, and we clear it at the end of each group (in .reset()).
- invariant(!_state);
-
- auto& expCtx = getExpressionContext();
- auto jsExec = expCtx->getJsExecWithScope();
- auto func = makeJsFunc(expCtx, _init);
-
- // input is a value produced by our AccumulationExpression::initializer.
- uassert(4544711,
- str::stream() << "$accumulator initArgs must evaluate to an array: "
- << input.toString(),
- input.getType() == BSONType::Array);
-
- size_t index = 0;
- BSONArrayBuilder bob;
- for (auto&& arg : input.getArray()) {
- arg.addToBsonArray(&bob, index++);
- }
-
- _state = jsExec->callFunction(func, bob.arr(), {});
-
- recomputeMemUsageBytes();
-}
-
-void AccumulatorJs::reset() {
- _state = std::nullopt;
- recomputeMemUsageBytes();
-}
-
-void AccumulatorJs::processInternal(const Value& input, bool merging) {
- // _state should be nonempty because we populate it in startNewGroup.
- invariant(_state);
-
- auto& expCtx = getExpressionContext();
- auto jsExec = expCtx->getJsExecWithScope();
-
- if (merging) {
- // input is an intermediate state from another instance of this kind of accumulator. Call
- // the user's merge function.
- auto func = makeJsFunc(expCtx, _merge);
- _state = jsExec->callFunction(func, BSON_ARRAY(*_state << input), {});
- recomputeMemUsageBytes();
- } else {
- // input is a value produced by our AccumulationExpression::argument. Call the user's
- // accumulate function.
- auto func = makeJsFunc(expCtx, _accumulate);
- uassert(4544712,
- str::stream() << "$accumulator accumulateArgs must evaluate to an array: "
- << input.toString(),
- input.getType() == BSONType::Array);
-
- size_t index = 0;
- BSONArrayBuilder bob;
- _state->addToBsonArray(&bob, index++);
- for (auto&& arg : input.getArray()) {
- arg.addToBsonArray(&bob, index++);
- }
-
- _state = jsExec->callFunction(func, bob.done(), {});
- recomputeMemUsageBytes();
- }
-}
-
-void AccumulatorJs::recomputeMemUsageBytes() {
- auto stateSize = _state.value_or(Value{}).getApproximateSize();
- uassert(4544713,
- str::stream() << "$accumulator state exceeded max BSON size: " << stateSize,
- stateSize <= BSONObjMaxUserSize);
- _memUsageBytes = sizeof(*this) + stateSize + _init.capacity() + _accumulate.capacity() +
- _merge.capacity() + _finalize.capacity();
-}
-
} // namespace mongo