diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2019-10-14 20:58:20 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-14 20:58:20 +0000 |
commit | 2fb44f082935fcefdce1fcb94e0cef2aab6e945e (patch) | |
tree | d842d9241998e704ac05c15b4c7a1f39408cfbc1 | |
parent | c119ef45e3f1c2cc7f36c7b81d0731e332461c2b (diff) | |
download | mongo-2fb44f082935fcefdce1fcb94e0cef2aab6e945e.tar.gz |
SERVER-43796 Support accumulators with an additional static argument
-rw-r--r-- | src/mongo/db/commands/mr_common.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulation_statement.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulation_statement.h | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 18 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_add_to_set.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_avg.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_first.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_js_reduce.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_last.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_merge_objects.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_min_max.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_push.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_std_dev.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator_sum.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 8 |
16 files changed, 59 insertions, 48 deletions
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index ea0dea5e218..1f2e0093919 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -93,10 +93,9 @@ auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string std::pair{"data"s, ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)}, std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})})); - auto jsReduce = AccumulationStatement{ - "value", - std::move(accumulatorArguments), - AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)}; + auto jsReduce = AccumulationStatement{"value", std::move(accumulatorArguments), [expCtx]() { + return AccumulatorInternalJsReduce::create(expCtx); + }}; auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState); return DocumentSourceGroup::create(expCtx, std::move(groupExpr), diff --git a/src/mongo/db/pipeline/accumulation_statement.cpp b/src/mongo/db/pipeline/accumulation_statement.cpp index ffa53f4c820..18595477227 100644 --- a/src/mongo/db/pipeline/accumulation_statement.cpp +++ b/src/mongo/db/pipeline/accumulation_statement.cpp @@ -46,27 +46,27 @@ using std::string; namespace { // Used to keep track of which Accumulators are registered under which name. -static StringMap<Accumulator::Factory> factoryMap; +static StringMap<AccumulationStatement::Parser> parserMap; } // namespace -void AccumulationStatement::registerAccumulator(std::string name, Accumulator::Factory factory) { - auto it = factoryMap.find(name); +void AccumulationStatement::registerAccumulator(std::string name, + AccumulationStatement::Parser parser) { + auto it = parserMap.find(name); massert(28722, str::stream() << "Duplicate accumulator (" << name << ") registered.", - it == factoryMap.end()); - factoryMap[name] = factory; + it == parserMap.end()); + parserMap[name] = parser; } -Accumulator::Factory AccumulationStatement::getFactory(StringData name) { - auto it = factoryMap.find(name); +AccumulationStatement::Parser& AccumulationStatement::getParser(StringData name) { + auto it = parserMap.find(name); uassert( - 15952, str::stream() << "unknown group operator '" << name << "'", it != factoryMap.end()); + 15952, str::stream() << "unknown group operator '" << name << "'", it != parserMap.end()); return it->second; } -boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const { - return _factory(expCtx); +boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator() const { + return _factory(); } AccumulationStatement AccumulationStatement::parseAccumulationStatement( @@ -97,9 +97,10 @@ AccumulationStatement AccumulationStatement::parseAccumulationStatement( str::stream() << "The " << accName << " accumulator is a unary operator", specElem.type() != BSONType::Array); - return {fieldName.toString(), - Expression::parseOperand(expCtx, specElem, vps), - AccumulationStatement::getFactory(accName)}; + auto&& parser = AccumulationStatement::getParser(accName); + auto [expression, factory] = parser(expCtx, specElem, vps); + + return AccumulationStatement(fieldName.toString(), expression, factory); } } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulation_statement.h b/src/mongo/db/pipeline/accumulation_statement.h index 07becc751e1..14b8ea953ab 100644 --- a/src/mongo/db/pipeline/accumulation_statement.h +++ b/src/mongo/db/pipeline/accumulation_statement.h @@ -58,6 +58,8 @@ namespace mongo { */ class AccumulationStatement { public: + using Parser = std::function<std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>( + boost::intrusive_ptr<ExpressionContext>, BSONElement, VariablesParseState)>; AccumulationStatement(std::string fieldName, boost::intrusive_ptr<Expression> expression, Accumulator::Factory factory) @@ -84,13 +86,13 @@ public: * DO NOT call this method directly. Instead, use the REGISTER_ACCUMULATOR macro defined in this * file. */ - static void registerAccumulator(std::string name, Accumulator::Factory factory); + static void registerAccumulator(std::string name, Parser parser); /** - * Retrieves the Factory for the accumulator specified by the given name, and raises an error if + * Retrieves the Parser for the accumulator specified by the given name, and raises an error if * there is no such Accumulator registered. */ - static Accumulator::Factory getFactory(StringData name); + static Parser& getParser(StringData name); // The field name is used to store the results of the accumulation in a result document. std::string fieldName; @@ -99,8 +101,7 @@ public: boost::intrusive_ptr<Expression> expression; // Constructs an Accumulator to do actual accumulation. - boost::intrusive_ptr<Accumulator> makeAccumulator( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const; + boost::intrusive_ptr<Accumulator> makeAccumulator() const; private: Accumulator::Factory _factory; diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 26dc64ecaeb..4219211a565 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -65,8 +65,7 @@ enum class AccumulatorDocumentsNeeded { class Accumulator : public RefCountable { public: - using Factory = boost::intrusive_ptr<Accumulator> (*)( - const boost::intrusive_ptr<ExpressionContext>& expCtx); + using Factory = std::function<boost::intrusive_ptr<Accumulator>()>; Accumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {} @@ -121,6 +120,19 @@ private: boost::intrusive_ptr<ExpressionContext> _expCtx; }; +/** + * A default parser for any accumulator that only takes a single expression as an argument. Returns + * the expression to be evaluated by the accumulator and an Accumulator::Factory. + */ +template <class AccName> +std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory> +genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> expCtx, + BSONElement elem, + VariablesParseState vps) { + auto exprValue = Expression::parseOperand(expCtx, elem, vps); + return {exprValue, [expCtx]() { return AccName::create(expCtx); }}; +} + class AccumulatorAddToSet final : public Accumulator { public: @@ -146,7 +158,6 @@ private: ValueUnorderedSet _set; }; - class AccumulatorFirst final : public Accumulator { public: explicit AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -303,7 +314,6 @@ private: long long _count; }; - class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp); diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp index 27f1b1d2958..b8f366f6c4f 100644 --- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp +++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp @@ -40,7 +40,7 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; -REGISTER_ACCUMULATOR(addToSet, AccumulatorAddToSet::create); +REGISTER_ACCUMULATOR(addToSet, genericParseSingleExpressionAccumulator<AccumulatorAddToSet>); const char* AccumulatorAddToSet::getOpName() const { return "$addToSet"; diff --git a/src/mongo/db/pipeline/accumulator_avg.cpp b/src/mongo/db/pipeline/accumulator_avg.cpp index fd989b08037..2efc2cee191 100644 --- a/src/mongo/db/pipeline/accumulator_avg.cpp +++ b/src/mongo/db/pipeline/accumulator_avg.cpp @@ -42,7 +42,7 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(avg, AccumulatorAvg::create); +REGISTER_ACCUMULATOR(avg, genericParseSingleExpressionAccumulator<AccumulatorAvg>); REGISTER_EXPRESSION(avg, ExpressionFromAccumulator<AccumulatorAvg>::parse); const char* AccumulatorAvg::getOpName() const { diff --git a/src/mongo/db/pipeline/accumulator_first.cpp b/src/mongo/db/pipeline/accumulator_first.cpp index 92d759af8fb..aed285cf5db 100644 --- a/src/mongo/db/pipeline/accumulator_first.cpp +++ b/src/mongo/db/pipeline/accumulator_first.cpp @@ -38,7 +38,7 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(first, AccumulatorFirst::create); +REGISTER_ACCUMULATOR(first, genericParseSingleExpressionAccumulator<AccumulatorFirst>); const char* AccumulatorFirst::getOpName() const { return "$first"; diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp index 0ec03295a35..37a5103e684 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp +++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp @@ -37,7 +37,8 @@ namespace mongo { -REGISTER_ACCUMULATOR(_internalJsReduce, AccumulatorInternalJsReduce::create); +REGISTER_ACCUMULATOR(_internalJsReduce, + genericParseSingleExpressionAccumulator<AccumulatorInternalJsReduce>); void AccumulatorInternalJsReduce::processInternal(const Value& input, bool merging) { if (input.missing()) { @@ -143,5 +144,4 @@ void AccumulatorInternalJsReduce::reset() { _memUsageBytes = sizeof(*this); _key = Value{}; } - } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_last.cpp b/src/mongo/db/pipeline/accumulator_last.cpp index 93f5d477a46..150360d4fdd 100644 --- a/src/mongo/db/pipeline/accumulator_last.cpp +++ b/src/mongo/db/pipeline/accumulator_last.cpp @@ -38,7 +38,7 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(last, AccumulatorLast::create); +REGISTER_ACCUMULATOR(last, genericParseSingleExpressionAccumulator<AccumulatorLast>); const char* AccumulatorLast::getOpName() const { return "$last"; diff --git a/src/mongo/db/pipeline/accumulator_merge_objects.cpp b/src/mongo/db/pipeline/accumulator_merge_objects.cpp index 9ffa78965b1..d1e6310ea23 100644 --- a/src/mongo/db/pipeline/accumulator_merge_objects.cpp +++ b/src/mongo/db/pipeline/accumulator_merge_objects.cpp @@ -41,7 +41,8 @@ using boost::intrusive_ptr; /* ------------------------- AccumulatorMergeObjects ----------------------------- */ -REGISTER_ACCUMULATOR(mergeObjects, AccumulatorMergeObjects::create); +REGISTER_ACCUMULATOR(mergeObjects, + genericParseSingleExpressionAccumulator<AccumulatorMergeObjects>); REGISTER_EXPRESSION(mergeObjects, ExpressionFromAccumulator<AccumulatorMergeObjects>::parse); const char* AccumulatorMergeObjects::getOpName() const { @@ -89,5 +90,4 @@ void AccumulatorMergeObjects::processInternal(const Value& input, bool merging) Value AccumulatorMergeObjects::getValue(bool toBeMerged) { return _output.freezeToValue(); } - } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_min_max.cpp b/src/mongo/db/pipeline/accumulator_min_max.cpp index d3e3f4fe621..c1af0ff8226 100644 --- a/src/mongo/db/pipeline/accumulator_min_max.cpp +++ b/src/mongo/db/pipeline/accumulator_min_max.cpp @@ -39,8 +39,8 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(max, AccumulatorMax::create); -REGISTER_ACCUMULATOR(min, AccumulatorMin::create); +REGISTER_ACCUMULATOR(max, genericParseSingleExpressionAccumulator<AccumulatorMax>); +REGISTER_ACCUMULATOR(min, genericParseSingleExpressionAccumulator<AccumulatorMin>); REGISTER_EXPRESSION(max, ExpressionFromAccumulator<AccumulatorMax>::parse); REGISTER_EXPRESSION(min, ExpressionFromAccumulator<AccumulatorMin>::parse); diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp index 147a2d7a353..b0d93ff9a4f 100644 --- a/src/mongo/db/pipeline/accumulator_push.cpp +++ b/src/mongo/db/pipeline/accumulator_push.cpp @@ -40,7 +40,7 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; -REGISTER_ACCUMULATOR(push, AccumulatorPush::create); +REGISTER_ACCUMULATOR(push, genericParseSingleExpressionAccumulator<AccumulatorPush>); const char* AccumulatorPush::getOpName() const { return "$push"; diff --git a/src/mongo/db/pipeline/accumulator_std_dev.cpp b/src/mongo/db/pipeline/accumulator_std_dev.cpp index 98f5bd80ddc..55367d766be 100644 --- a/src/mongo/db/pipeline/accumulator_std_dev.cpp +++ b/src/mongo/db/pipeline/accumulator_std_dev.cpp @@ -40,8 +40,8 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(stdDevPop, AccumulatorStdDevPop::create); -REGISTER_ACCUMULATOR(stdDevSamp, AccumulatorStdDevSamp::create); +REGISTER_ACCUMULATOR(stdDevPop, genericParseSingleExpressionAccumulator<AccumulatorStdDevPop>); +REGISTER_ACCUMULATOR(stdDevSamp, genericParseSingleExpressionAccumulator<AccumulatorStdDevSamp>); REGISTER_EXPRESSION(stdDevPop, ExpressionFromAccumulator<AccumulatorStdDevPop>::parse); REGISTER_EXPRESSION(stdDevSamp, ExpressionFromAccumulator<AccumulatorStdDevSamp>::parse); diff --git a/src/mongo/db/pipeline/accumulator_sum.cpp b/src/mongo/db/pipeline/accumulator_sum.cpp index 81bc6c6af3e..cd1d4a2eabe 100644 --- a/src/mongo/db/pipeline/accumulator_sum.cpp +++ b/src/mongo/db/pipeline/accumulator_sum.cpp @@ -43,7 +43,7 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(sum, AccumulatorSum::create); +REGISTER_ACCUMULATOR(sum, genericParseSingleExpressionAccumulator<AccumulatorSum>); REGISTER_EXPRESSION(sum, ExpressionFromAccumulator<AccumulatorSum>::parse); const char* AccumulatorSum::getOpName() const { diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 26cb0e9a89b..eba38c5c58b 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -311,7 +311,7 @@ DocumentSourceBucketAuto::Bucket::Bucket( : _min(min), _max(max) { _accums.reserve(accumulationStatements.size()); for (auto&& accumulationStatement : accumulationStatements) { - _accums.push_back(accumulationStatement.makeAccumulator(expCtx)); + _accums.push_back(accumulationStatement.makeAccumulator()); } } @@ -382,7 +382,7 @@ Value DocumentSourceBucketAuto::serialize( MutableDocument outputSpec(_accumulatedFields.size()); for (auto&& accumulatedField : _accumulatedFields) { - intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(pExpCtx); + intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(); outputSpec[accumulatedField.fieldName] = Value{Document{{accum->getOpName(), accumulatedField.expression->serialize(static_cast<bool>(explain))}}}; @@ -407,7 +407,7 @@ intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create( if (accumulationStatements.empty()) { accumulationStatements.emplace_back("count", ExpressionConstant::create(pExpCtx, Value(1)), - AccumulationStatement::getFactory("$sum")); + [pExpCtx] { return AccumulatorSum::create(pExpCtx); }); } return new DocumentSourceBucketAuto(pExpCtx, groupByExpression, diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index e15c3e48382..4f04626c341 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -242,7 +242,7 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> // Add the remaining fields. for (auto&& accumulatedField : _accumulatedFields) { - intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(pExpCtx); + intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(); insides[accumulatedField.fieldName] = Value(DOC(accum->getOpName() << accumulatedField.expression->serialize(static_cast<bool>(explain)))); @@ -497,7 +497,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // Add the accumulators group.reserve(numAccumulators); for (auto&& accumulatedField : _accumulatedFields) { - group.push_back(accumulatedField.makeAccumulator(pExpCtx)); + group.push_back(accumulatedField.makeAccumulator()); } } else { for (auto&& groupObj : group) { @@ -557,7 +557,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // prepare current to accumulate data _currentAccumulators.reserve(numAccumulators); for (auto&& accumulatedField : _accumulatedFields) { - _currentAccumulators.push_back(accumulatedField.makeAccumulator(pExpCtx)); + _currentAccumulators.push_back(accumulatedField.makeAccumulator()); } verify(_sorterIterator->more()); // we put data in, we should get something out. @@ -769,7 +769,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { // We can't do this transformation if there are any non-$first accumulators. for (auto&& accumulator : _accumulatedFields) { if (AccumulatorDocumentsNeeded::kFirstDocument != - accumulator.makeAccumulator(pExpCtx)->documentsNeeded()) { + accumulator.makeAccumulator()->documentsNeeded()) { return nullptr; } } |