diff options
author | Louis Williams <louis.williams@mongodb.com> | 2020-02-26 16:17:59 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-26 16:17:59 +0000 |
commit | 490c13c0620017dfca4f3e53e0574fef064784e7 (patch) | |
tree | 8c7eba870df9712a4d68300953c933b7970fe12d /src/mongo/db/pipeline/accumulator_js_reduce.cpp | |
parent | ce4f6d6f6e0be8478e1f2f6f728dcc89f4b1e271 (diff) | |
download | mongo-490c13c0620017dfca4f3e53e0574fef064784e7.tar.gz |
Revert "SERVER-45447 Add $accumulator for user-defined Javascript accumulators"
This reverts commit 5b50a111c9361554bc7dbe6a8c63c885a5c29df6.
Diffstat (limited to 'src/mongo/db/pipeline/accumulator_js_reduce.cpp')
-rw-r--r-- | src/mongo/db/pipeline/accumulator_js_reduce.cpp | 247 |
1 files changed, 12 insertions, 235 deletions
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp index 41183c7ac40..877ac1ca41e 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp +++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp @@ -37,8 +37,10 @@ namespace mongo { REGISTER_ACCUMULATOR(_internalJsReduce, AccumulatorInternalJsReduce::parseInternalJsReduce); -AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce( - boost::intrusive_ptr<ExpressionContext> expCtx, BSONElement elem, VariablesParseState vps) { +std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> +AccumulatorInternalJsReduce::parseInternalJsReduce(boost::intrusive_ptr<ExpressionContext> expCtx, + BSONElement elem, + VariablesParseState vps) { uassert(31326, str::stream() << kAccumulatorName << " requires a document argument, but found " << elem.type(), @@ -46,13 +48,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce( BSONObj obj = elem.embeddedObject(); std::string funcSource; - boost::intrusive_ptr<Expression> argument; + boost::intrusive_ptr<Expression> dataExpr; for (auto&& element : obj) { if (element.fieldNameStringData() == "eval") { funcSource = parseReduceFunction(element); } else if (element.fieldNameStringData() == "data") { - argument = Expression::parseOperand(expCtx, element, vps); + dataExpr = Expression::parseOperand(expCtx, element, vps); } else { uasserted(31243, str::stream() << "Invalid argument specified to " << kAccumulatorName << ": " @@ -66,14 +68,13 @@ AccumulationExpression AccumulatorInternalJsReduce::parseInternalJsReduce( uassert(31349, str::stream() << kAccumulatorName << " requires 'data' argument, recieved input: " << obj.toString(false), - argument); + dataExpr); auto factory = [expCtx, funcSource = funcSource]() { return AccumulatorInternalJsReduce::create(expCtx, funcSource); }; - auto initializer = ExpressionConstant::create(expCtx, Value(BSONNULL)); - return {std::move(initializer), std::move(argument), std::move(factory)}; + return {std::move(dataExpr), std::move(factory)}; } std::string AccumulatorInternalJsReduce::parseReduceFunction(BSONElement func) { @@ -167,7 +168,7 @@ Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) { } } -boost::intrusive_ptr<AccumulatorState> AccumulatorInternalJsReduce::create( +boost::intrusive_ptr<Accumulator> AccumulatorInternalJsReduce::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData funcSource) { return make_intrusive<AccumulatorInternalJsReduce>(expCtx, funcSource); @@ -180,233 +181,9 @@ void AccumulatorInternalJsReduce::reset() { } // Returns this accumulator serialized as a Value along with the reduce function. -Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, +Document AccumulatorInternalJsReduce::serialize(boost::intrusive_ptr<Expression> expression, bool explain) const { - return DOC(getOpName() << DOC("data" << argument->serialize(explain) << "eval" << _funcSource)); + return DOC( + getOpName() << DOC("data" << expression->serialize(explain) << "eval" << _funcSource)); } - -REGISTER_ACCUMULATOR(accumulator, AccumulatorJs::parse); - -boost::intrusive_ptr<AccumulatorState> AccumulatorJs::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::string init, - std::string accumulate, - std::string merge, - std::string finalize) { - return new AccumulatorJs( - expCtx, std::move(init), std::move(accumulate), std::move(merge), std::move(finalize)); -} - -namespace { -// Parses a constant expression of type String or Code. -std::string parseFunction(StringData fieldName, - boost::intrusive_ptr<ExpressionContext> expCtx, - BSONElement elem, - VariablesParseState vps) { - boost::intrusive_ptr<Expression> expr = Expression::parseOperand(expCtx, elem, vps); - expr = expr->optimize(); - ExpressionConstant* ec = dynamic_cast<ExpressionConstant*>(expr.get()); - uassert(4544701, - str::stream() << "$accumulator '" << fieldName << "' must be a constant expression", - ec); - Value v = ec->getValue(); - uassert(4544702, - str::stream() << "$accumulator '" << fieldName << "' must be a String or Code", - v.getType() == BSONType::String || v.getType() == BSONType::Code); - return v.coerceToString(); -} -} // namespace - - -Document AccumulatorJs::serialize(boost::intrusive_ptr<Expression> initializer, - boost::intrusive_ptr<Expression> argument, - bool explain) const { - MutableDocument args; - args.addField("init", Value(_init)); - args.addField("initArgs", Value(initializer->serialize(explain))); - args.addField("accumulate", Value(_accumulate)); - args.addField("accumulateArgs", Value(argument->serialize(explain))); - args.addField("merge", Value(_merge)); - args.addField("finalize", Value(_finalize)); - args.addField("lang", Value("js"_sd)); - return DOC(getOpName() << args.freeze()); -} - -AccumulationExpression AccumulatorJs::parse(boost::intrusive_ptr<ExpressionContext> expCtx, - BSONElement elem, - VariablesParseState vps) { - /* - * {$accumulator: { - * init: <code>, - * accumulate: <code>, - * merge: <code>, - * finalize: <code>, - * - * accumulateArgs: <expr>, // evaluated once per document - * - * initArgs: <expr>, // evaluated once per group - * - * lang: 'js', - * }} - */ - uassert(4544703, - str::stream() << "$accumulator expects an object as an argument; found: " - << typeName(elem.type()), - elem.type() == BSONType::Object); - BSONObj obj = elem.embeddedObject(); - - std::string init, accumulate, merge, finalize; - boost::intrusive_ptr<Expression> initArgs, accumulateArgs; - - for (auto&& element : obj) { - auto name = element.fieldNameStringData(); - if (name == "init") { - init = parseFunction("init", expCtx, element, vps); - } else if (name == "accumulate") { - accumulate = parseFunction("accumulate", expCtx, element, vps); - } else if (name == "merge") { - merge = parseFunction("merge", expCtx, element, vps); - } else if (name == "finalize") { - finalize = parseFunction("finalize", expCtx, element, vps); - } else if (name == "initArgs") { - initArgs = Expression::parseOperand(expCtx, element, vps); - } else if (name == "accumulateArgs") { - accumulateArgs = Expression::parseOperand(expCtx, element, vps); - } else if (name == "lang") { - uassert(4544704, - str::stream() << "$accumulator lang must be a string; found: " - << element.type(), - element.type() == BSONType::String); - uassert(4544705, - "$accumulator only supports lang: 'js'", - element.valueStringData() == "js"); - } else { - // unexpected field - uassert( - 4544706, str::stream() << "$accumulator got an unexpected field: " << name, false); - } - } - uassert(4544707, "$accumulator missing required argument 'init'", !init.empty()); - uassert(4544708, "$accumulator missing required argument 'accumulate'", !accumulate.empty()); - uassert(4544709, "$accumulator missing required argument 'merge'", !merge.empty()); - if (finalize.empty()) { - // finalize is optional because many custom accumulators will return the final state - // unchanged. - finalize = "function(state) { return state; }"; - } - if (!initArgs) { - // initArgs is optional because most custom accumulators don't need the state to depend on - // the group key. - initArgs = ExpressionConstant::create(expCtx, Value(BSONArray())); - } - // accumulateArgs is required because it's the only way to communicate a value from the input - // stream into the accumulator state. - uassert(4544710, "$accumulator missing required argument 'accumulateArgs'", accumulateArgs); - - auto factory = [expCtx = expCtx, - init = std::move(init), - accumulate = std::move(accumulate), - merge = std::move(merge), - finalize = std::move(finalize)]() { - return new AccumulatorJs(expCtx, init, accumulate, merge, finalize); - }; - return {std::move(initArgs), std::move(accumulateArgs), std::move(factory)}; -} - -Value AccumulatorJs::getValue(bool toBeMerged) { - // _state is initialized when we encounter the first document in each group. We never create - // empty groups: even in a {$group: {_id: 1, ...}}, we will return zero groups rather than one - // empty group. - invariant(_state); - - // If toBeMerged then we return the current state, to be fed back in to accumulate / merge / - // finalize later. If not toBeMerged then we return the final value, by calling finalize. - if (toBeMerged) { - return *_state; - } - - // Get the final value given the current accumulator state. - - auto& expCtx = getExpressionContext(); - auto jsExec = expCtx->getJsExecWithScope(); - auto func = makeJsFunc(expCtx, _finalize); - - return jsExec->callFunction(func, BSON_ARRAY(*_state), {}); -} - -void AccumulatorJs::startNewGroup(Value const& input) { - // Between groups the _state should be empty: we initialize it to be empty it in the - // constructor, and we clear it at the end of each group (in .reset()). - invariant(!_state); - - auto& expCtx = getExpressionContext(); - auto jsExec = expCtx->getJsExecWithScope(); - auto func = makeJsFunc(expCtx, _init); - - // input is a value produced by our AccumulationExpression::initializer. - uassert(4544711, - str::stream() << "$accumulator initArgs must evaluate to an array: " - << input.toString(), - input.getType() == BSONType::Array); - - size_t index = 0; - BSONArrayBuilder bob; - for (auto&& arg : input.getArray()) { - arg.addToBsonArray(&bob, index++); - } - - _state = jsExec->callFunction(func, bob.arr(), {}); - - recomputeMemUsageBytes(); -} - -void AccumulatorJs::reset() { - _state = std::nullopt; - recomputeMemUsageBytes(); -} - -void AccumulatorJs::processInternal(const Value& input, bool merging) { - // _state should be nonempty because we populate it in startNewGroup. - invariant(_state); - - auto& expCtx = getExpressionContext(); - auto jsExec = expCtx->getJsExecWithScope(); - - if (merging) { - // input is an intermediate state from another instance of this kind of accumulator. Call - // the user's merge function. - auto func = makeJsFunc(expCtx, _merge); - _state = jsExec->callFunction(func, BSON_ARRAY(*_state << input), {}); - recomputeMemUsageBytes(); - } else { - // input is a value produced by our AccumulationExpression::argument. Call the user's - // accumulate function. - auto func = makeJsFunc(expCtx, _accumulate); - uassert(4544712, - str::stream() << "$accumulator accumulateArgs must evaluate to an array: " - << input.toString(), - input.getType() == BSONType::Array); - - size_t index = 0; - BSONArrayBuilder bob; - _state->addToBsonArray(&bob, index++); - for (auto&& arg : input.getArray()) { - arg.addToBsonArray(&bob, index++); - } - - _state = jsExec->callFunction(func, bob.done(), {}); - recomputeMemUsageBytes(); - } -} - -void AccumulatorJs::recomputeMemUsageBytes() { - auto stateSize = _state.value_or(Value{}).getApproximateSize(); - uassert(4544713, - str::stream() << "$accumulator state exceeded max BSON size: " << stateSize, - stateSize <= BSONObjMaxUserSize); - _memUsageBytes = sizeof(*this) + stateSize + _init.capacity() + _accumulate.capacity() + - _merge.capacity() + _finalize.capacity(); -} - } // namespace mongo |