summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2019-10-14 20:58:20 +0000
committerevergreen <evergreen@mongodb.com>2019-10-14 20:58:20 +0000
commit2fb44f082935fcefdce1fcb94e0cef2aab6e945e (patch)
treed842d9241998e704ac05c15b4c7a1f39408cfbc1
parentc119ef45e3f1c2cc7f36c7b81d0731e332461c2b (diff)
downloadmongo-2fb44f082935fcefdce1fcb94e0cef2aab6e945e.tar.gz
SERVER-43796 Support accumulators with an additional static argument
-rw-r--r--src/mongo/db/commands/mr_common.cpp7
-rw-r--r--src/mongo/db/pipeline/accumulation_statement.cpp29
-rw-r--r--src/mongo/db/pipeline/accumulation_statement.h11
-rw-r--r--src/mongo/db/pipeline/accumulator.h18
-rw-r--r--src/mongo/db/pipeline/accumulator_add_to_set.cpp2
-rw-r--r--src/mongo/db/pipeline/accumulator_avg.cpp2
-rw-r--r--src/mongo/db/pipeline/accumulator_first.cpp2
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_last.cpp2
-rw-r--r--src/mongo/db/pipeline/accumulator_merge_objects.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_min_max.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_push.cpp2
-rw-r--r--src/mongo/db/pipeline/accumulator_std_dev.cpp4
-rw-r--r--src/mongo/db/pipeline/accumulator_sum.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp8
16 files changed, 59 insertions, 48 deletions
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index ea0dea5e218..1f2e0093919 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -93,10 +93,9 @@ auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string
std::pair{"data"s,
ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)},
std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})}));
- auto jsReduce = AccumulationStatement{
- "value",
- std::move(accumulatorArguments),
- AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)};
+ auto jsReduce = AccumulationStatement{"value", std::move(accumulatorArguments), [expCtx]() {
+ return AccumulatorInternalJsReduce::create(expCtx);
+ }};
auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
return DocumentSourceGroup::create(expCtx,
std::move(groupExpr),
diff --git a/src/mongo/db/pipeline/accumulation_statement.cpp b/src/mongo/db/pipeline/accumulation_statement.cpp
index ffa53f4c820..18595477227 100644
--- a/src/mongo/db/pipeline/accumulation_statement.cpp
+++ b/src/mongo/db/pipeline/accumulation_statement.cpp
@@ -46,27 +46,27 @@ using std::string;
namespace {
// Used to keep track of which Accumulators are registered under which name.
-static StringMap<Accumulator::Factory> factoryMap;
+static StringMap<AccumulationStatement::Parser> parserMap;
} // namespace
-void AccumulationStatement::registerAccumulator(std::string name, Accumulator::Factory factory) {
- auto it = factoryMap.find(name);
+void AccumulationStatement::registerAccumulator(std::string name,
+ AccumulationStatement::Parser parser) {
+ auto it = parserMap.find(name);
massert(28722,
str::stream() << "Duplicate accumulator (" << name << ") registered.",
- it == factoryMap.end());
- factoryMap[name] = factory;
+ it == parserMap.end());
+ parserMap[name] = parser;
}
-Accumulator::Factory AccumulationStatement::getFactory(StringData name) {
- auto it = factoryMap.find(name);
+AccumulationStatement::Parser& AccumulationStatement::getParser(StringData name) {
+ auto it = parserMap.find(name);
uassert(
- 15952, str::stream() << "unknown group operator '" << name << "'", it != factoryMap.end());
+ 15952, str::stream() << "unknown group operator '" << name << "'", it != parserMap.end());
return it->second;
}
-boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
- return _factory(expCtx);
+boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator() const {
+ return _factory();
}
AccumulationStatement AccumulationStatement::parseAccumulationStatement(
@@ -97,9 +97,10 @@ AccumulationStatement AccumulationStatement::parseAccumulationStatement(
str::stream() << "The " << accName << " accumulator is a unary operator",
specElem.type() != BSONType::Array);
- return {fieldName.toString(),
- Expression::parseOperand(expCtx, specElem, vps),
- AccumulationStatement::getFactory(accName)};
+ auto&& parser = AccumulationStatement::getParser(accName);
+ auto [expression, factory] = parser(expCtx, specElem, vps);
+
+ return AccumulationStatement(fieldName.toString(), expression, factory);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulation_statement.h b/src/mongo/db/pipeline/accumulation_statement.h
index 07becc751e1..14b8ea953ab 100644
--- a/src/mongo/db/pipeline/accumulation_statement.h
+++ b/src/mongo/db/pipeline/accumulation_statement.h
@@ -58,6 +58,8 @@ namespace mongo {
*/
class AccumulationStatement {
public:
+ using Parser = std::function<std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>(
+ boost::intrusive_ptr<ExpressionContext>, BSONElement, VariablesParseState)>;
AccumulationStatement(std::string fieldName,
boost::intrusive_ptr<Expression> expression,
Accumulator::Factory factory)
@@ -84,13 +86,13 @@ public:
* DO NOT call this method directly. Instead, use the REGISTER_ACCUMULATOR macro defined in this
* file.
*/
- static void registerAccumulator(std::string name, Accumulator::Factory factory);
+ static void registerAccumulator(std::string name, Parser parser);
/**
- * Retrieves the Factory for the accumulator specified by the given name, and raises an error if
+ * Retrieves the Parser for the accumulator specified by the given name, and raises an error if
* there is no such Accumulator registered.
*/
- static Accumulator::Factory getFactory(StringData name);
+ static Parser& getParser(StringData name);
// The field name is used to store the results of the accumulation in a result document.
std::string fieldName;
@@ -99,8 +101,7 @@ public:
boost::intrusive_ptr<Expression> expression;
// Constructs an Accumulator to do actual accumulation.
- boost::intrusive_ptr<Accumulator> makeAccumulator(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const;
+ boost::intrusive_ptr<Accumulator> makeAccumulator() const;
private:
Accumulator::Factory _factory;
diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h
index 26dc64ecaeb..4219211a565 100644
--- a/src/mongo/db/pipeline/accumulator.h
+++ b/src/mongo/db/pipeline/accumulator.h
@@ -65,8 +65,7 @@ enum class AccumulatorDocumentsNeeded {
class Accumulator : public RefCountable {
public:
- using Factory = boost::intrusive_ptr<Accumulator> (*)(
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ using Factory = std::function<boost::intrusive_ptr<Accumulator>()>;
Accumulator(const boost::intrusive_ptr<ExpressionContext>& expCtx) : _expCtx(expCtx) {}
@@ -121,6 +120,19 @@ private:
boost::intrusive_ptr<ExpressionContext> _expCtx;
};
+/**
+ * A default parser for any accumulator that only takes a single expression as an argument. Returns
+ * the expression to be evaluated by the accumulator and an Accumulator::Factory.
+ */
+template <class AccName>
+std::pair<boost::intrusive_ptr<Expression>, Accumulator::Factory>
+genericParseSingleExpressionAccumulator(boost::intrusive_ptr<ExpressionContext> expCtx,
+ BSONElement elem,
+ VariablesParseState vps) {
+ auto exprValue = Expression::parseOperand(expCtx, elem, vps);
+ return {exprValue, [expCtx]() { return AccName::create(expCtx); }};
+}
+
class AccumulatorAddToSet final : public Accumulator {
public:
@@ -146,7 +158,6 @@ private:
ValueUnorderedSet _set;
};
-
class AccumulatorFirst final : public Accumulator {
public:
explicit AccumulatorFirst(const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -303,7 +314,6 @@ private:
long long _count;
};
-
class AccumulatorStdDev : public Accumulator {
public:
AccumulatorStdDev(const boost::intrusive_ptr<ExpressionContext>& expCtx, bool isSamp);
diff --git a/src/mongo/db/pipeline/accumulator_add_to_set.cpp b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
index 27f1b1d2958..b8f366f6c4f 100644
--- a/src/mongo/db/pipeline/accumulator_add_to_set.cpp
+++ b/src/mongo/db/pipeline/accumulator_add_to_set.cpp
@@ -40,7 +40,7 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
-REGISTER_ACCUMULATOR(addToSet, AccumulatorAddToSet::create);
+REGISTER_ACCUMULATOR(addToSet, genericParseSingleExpressionAccumulator<AccumulatorAddToSet>);
const char* AccumulatorAddToSet::getOpName() const {
return "$addToSet";
diff --git a/src/mongo/db/pipeline/accumulator_avg.cpp b/src/mongo/db/pipeline/accumulator_avg.cpp
index fd989b08037..2efc2cee191 100644
--- a/src/mongo/db/pipeline/accumulator_avg.cpp
+++ b/src/mongo/db/pipeline/accumulator_avg.cpp
@@ -42,7 +42,7 @@ namespace mongo {
using boost::intrusive_ptr;
-REGISTER_ACCUMULATOR(avg, AccumulatorAvg::create);
+REGISTER_ACCUMULATOR(avg, genericParseSingleExpressionAccumulator<AccumulatorAvg>);
REGISTER_EXPRESSION(avg, ExpressionFromAccumulator<AccumulatorAvg>::parse);
const char* AccumulatorAvg::getOpName() const {
diff --git a/src/mongo/db/pipeline/accumulator_first.cpp b/src/mongo/db/pipeline/accumulator_first.cpp
index 92d759af8fb..aed285cf5db 100644
--- a/src/mongo/db/pipeline/accumulator_first.cpp
+++ b/src/mongo/db/pipeline/accumulator_first.cpp
@@ -38,7 +38,7 @@ namespace mongo {
using boost::intrusive_ptr;
-REGISTER_ACCUMULATOR(first, AccumulatorFirst::create);
+REGISTER_ACCUMULATOR(first, genericParseSingleExpressionAccumulator<AccumulatorFirst>);
const char* AccumulatorFirst::getOpName() const {
return "$first";
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
index 0ec03295a35..37a5103e684 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
@@ -37,7 +37,8 @@
namespace mongo {
-REGISTER_ACCUMULATOR(_internalJsReduce, AccumulatorInternalJsReduce::create);
+REGISTER_ACCUMULATOR(_internalJsReduce,
+ genericParseSingleExpressionAccumulator<AccumulatorInternalJsReduce>);
void AccumulatorInternalJsReduce::processInternal(const Value& input, bool merging) {
if (input.missing()) {
@@ -143,5 +144,4 @@ void AccumulatorInternalJsReduce::reset() {
_memUsageBytes = sizeof(*this);
_key = Value{};
}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_last.cpp b/src/mongo/db/pipeline/accumulator_last.cpp
index 93f5d477a46..150360d4fdd 100644
--- a/src/mongo/db/pipeline/accumulator_last.cpp
+++ b/src/mongo/db/pipeline/accumulator_last.cpp
@@ -38,7 +38,7 @@ namespace mongo {
using boost::intrusive_ptr;
-REGISTER_ACCUMULATOR(last, AccumulatorLast::create);
+REGISTER_ACCUMULATOR(last, genericParseSingleExpressionAccumulator<AccumulatorLast>);
const char* AccumulatorLast::getOpName() const {
return "$last";
diff --git a/src/mongo/db/pipeline/accumulator_merge_objects.cpp b/src/mongo/db/pipeline/accumulator_merge_objects.cpp
index 9ffa78965b1..d1e6310ea23 100644
--- a/src/mongo/db/pipeline/accumulator_merge_objects.cpp
+++ b/src/mongo/db/pipeline/accumulator_merge_objects.cpp
@@ -41,7 +41,8 @@ using boost::intrusive_ptr;
/* ------------------------- AccumulatorMergeObjects ----------------------------- */
-REGISTER_ACCUMULATOR(mergeObjects, AccumulatorMergeObjects::create);
+REGISTER_ACCUMULATOR(mergeObjects,
+ genericParseSingleExpressionAccumulator<AccumulatorMergeObjects>);
REGISTER_EXPRESSION(mergeObjects, ExpressionFromAccumulator<AccumulatorMergeObjects>::parse);
const char* AccumulatorMergeObjects::getOpName() const {
@@ -89,5 +90,4 @@ void AccumulatorMergeObjects::processInternal(const Value& input, bool merging)
Value AccumulatorMergeObjects::getValue(bool toBeMerged) {
return _output.freezeToValue();
}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_min_max.cpp b/src/mongo/db/pipeline/accumulator_min_max.cpp
index d3e3f4fe621..c1af0ff8226 100644
--- a/src/mongo/db/pipeline/accumulator_min_max.cpp
+++ b/src/mongo/db/pipeline/accumulator_min_max.cpp
@@ -39,8 +39,8 @@ namespace mongo {
using boost::intrusive_ptr;
-REGISTER_ACCUMULATOR(max, AccumulatorMax::create);
-REGISTER_ACCUMULATOR(min, AccumulatorMin::create);
+REGISTER_ACCUMULATOR(max, genericParseSingleExpressionAccumulator<AccumulatorMax>);
+REGISTER_ACCUMULATOR(min, genericParseSingleExpressionAccumulator<AccumulatorMin>);
REGISTER_EXPRESSION(max, ExpressionFromAccumulator<AccumulatorMax>::parse);
REGISTER_EXPRESSION(min, ExpressionFromAccumulator<AccumulatorMin>::parse);
diff --git a/src/mongo/db/pipeline/accumulator_push.cpp b/src/mongo/db/pipeline/accumulator_push.cpp
index 147a2d7a353..b0d93ff9a4f 100644
--- a/src/mongo/db/pipeline/accumulator_push.cpp
+++ b/src/mongo/db/pipeline/accumulator_push.cpp
@@ -40,7 +40,7 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
-REGISTER_ACCUMULATOR(push, AccumulatorPush::create);
+REGISTER_ACCUMULATOR(push, genericParseSingleExpressionAccumulator<AccumulatorPush>);
const char* AccumulatorPush::getOpName() const {
return "$push";
diff --git a/src/mongo/db/pipeline/accumulator_std_dev.cpp b/src/mongo/db/pipeline/accumulator_std_dev.cpp
index 98f5bd80ddc..55367d766be 100644
--- a/src/mongo/db/pipeline/accumulator_std_dev.cpp
+++ b/src/mongo/db/pipeline/accumulator_std_dev.cpp
@@ -40,8 +40,8 @@
namespace mongo {
using boost::intrusive_ptr;
-REGISTER_ACCUMULATOR(stdDevPop, AccumulatorStdDevPop::create);
-REGISTER_ACCUMULATOR(stdDevSamp, AccumulatorStdDevSamp::create);
+REGISTER_ACCUMULATOR(stdDevPop, genericParseSingleExpressionAccumulator<AccumulatorStdDevPop>);
+REGISTER_ACCUMULATOR(stdDevSamp, genericParseSingleExpressionAccumulator<AccumulatorStdDevSamp>);
REGISTER_EXPRESSION(stdDevPop, ExpressionFromAccumulator<AccumulatorStdDevPop>::parse);
REGISTER_EXPRESSION(stdDevSamp, ExpressionFromAccumulator<AccumulatorStdDevSamp>::parse);
diff --git a/src/mongo/db/pipeline/accumulator_sum.cpp b/src/mongo/db/pipeline/accumulator_sum.cpp
index 81bc6c6af3e..cd1d4a2eabe 100644
--- a/src/mongo/db/pipeline/accumulator_sum.cpp
+++ b/src/mongo/db/pipeline/accumulator_sum.cpp
@@ -43,7 +43,7 @@ namespace mongo {
using boost::intrusive_ptr;
-REGISTER_ACCUMULATOR(sum, AccumulatorSum::create);
+REGISTER_ACCUMULATOR(sum, genericParseSingleExpressionAccumulator<AccumulatorSum>);
REGISTER_EXPRESSION(sum, ExpressionFromAccumulator<AccumulatorSum>::parse);
const char* AccumulatorSum::getOpName() const {
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 26cb0e9a89b..eba38c5c58b 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -311,7 +311,7 @@ DocumentSourceBucketAuto::Bucket::Bucket(
: _min(min), _max(max) {
_accums.reserve(accumulationStatements.size());
for (auto&& accumulationStatement : accumulationStatements) {
- _accums.push_back(accumulationStatement.makeAccumulator(expCtx));
+ _accums.push_back(accumulationStatement.makeAccumulator());
}
}
@@ -382,7 +382,7 @@ Value DocumentSourceBucketAuto::serialize(
MutableDocument outputSpec(_accumulatedFields.size());
for (auto&& accumulatedField : _accumulatedFields) {
- intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(pExpCtx);
+ intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator();
outputSpec[accumulatedField.fieldName] =
Value{Document{{accum->getOpName(),
accumulatedField.expression->serialize(static_cast<bool>(explain))}}};
@@ -407,7 +407,7 @@ intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create(
if (accumulationStatements.empty()) {
accumulationStatements.emplace_back("count",
ExpressionConstant::create(pExpCtx, Value(1)),
- AccumulationStatement::getFactory("$sum"));
+ [pExpCtx] { return AccumulatorSum::create(pExpCtx); });
}
return new DocumentSourceBucketAuto(pExpCtx,
groupByExpression,
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index e15c3e48382..4f04626c341 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -242,7 +242,7 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity>
// Add the remaining fields.
for (auto&& accumulatedField : _accumulatedFields) {
- intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(pExpCtx);
+ intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator();
insides[accumulatedField.fieldName] =
Value(DOC(accum->getOpName()
<< accumulatedField.expression->serialize(static_cast<bool>(explain))));
@@ -497,7 +497,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// Add the accumulators
group.reserve(numAccumulators);
for (auto&& accumulatedField : _accumulatedFields) {
- group.push_back(accumulatedField.makeAccumulator(pExpCtx));
+ group.push_back(accumulatedField.makeAccumulator());
}
} else {
for (auto&& groupObj : group) {
@@ -557,7 +557,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// prepare current to accumulate data
_currentAccumulators.reserve(numAccumulators);
for (auto&& accumulatedField : _accumulatedFields) {
- _currentAccumulators.push_back(accumulatedField.makeAccumulator(pExpCtx));
+ _currentAccumulators.push_back(accumulatedField.makeAccumulator());
}
verify(_sorterIterator->more()); // we put data in, we should get something out.
@@ -769,7 +769,7 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
// We can't do this transformation if there are any non-$first accumulators.
for (auto&& accumulator : _accumulatedFields) {
if (AccumulatorDocumentsNeeded::kFirstDocument !=
- accumulator.makeAccumulator(pExpCtx)->documentsNeeded()) {
+ accumulator.makeAccumulator()->documentsNeeded()) {
return nullptr;
}
}