summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_group.cpp
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2020-01-17 16:20:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 20:44:41 +0000
commit606fbf8eac896b0b4ed26e921b7f6bf1f73f5511 (patch)
tree4855ab6890e429ff79ffdf867d2b973361b62b00 /src/mongo/db/pipeline/document_source_group.cpp
parent5e57c0b0f7505035c37179d100fdd43ef2b6cc36 (diff)
downloadmongo-606fbf8eac896b0b4ed26e921b7f6bf1f73f5511.tar.gz
SERVER-45447 Add $accumulator for user-defined Javascript accumulators
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp45
1 files changed, 35 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 0f961f8c363..8268f0a1dd1 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -158,6 +158,17 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() {
_currentId = _firstPartOfNextGroup.first;
const size_t numAccumulators = _accumulatedFields.size();
+
+ // Call startNewGroup on every accumulator.
+ Value expandedId = expandId(_currentId);
+ Document idDoc =
+ expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
+ for (size_t i = 0; i < numAccumulators; ++i) {
+ Value initializerValue =
+ _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables);
+ _currentAccumulators[i]->startNewGroup(initializerValue);
+ }
+
while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) {
// Inside of this loop, _firstPartOfNextGroup is the current data being processed.
// At loop exit, it is the first value to be processed in the next group.
@@ -216,7 +227,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() {
}
for (auto&& accumulatedField : _accumulatedFields) {
- accumulatedField.expression = accumulatedField.expression->optimize();
+ accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize();
+ accumulatedField.expr.argument = accumulatedField.expr.argument->optimize();
}
return this;
@@ -241,9 +253,11 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity>
// Add the remaining fields.
for (auto&& accumulatedField : _accumulatedFields) {
- intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator();
+ intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator();
insides[accumulatedField.fieldName] =
- Value(accum->serialize(accumulatedField.expression, static_cast<bool>(explain)));
+ Value(accum->serialize(accumulatedField.expr.initializer,
+ accumulatedField.expr.argument,
+ static_cast<bool>(explain)));
}
if (_doingMerge) {
@@ -263,7 +277,8 @@ DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const
// add the rest
for (auto&& accumulatedField : _accumulatedFields) {
- accumulatedField.expression->addDependencies(deps);
+ accumulatedField.expr.argument->addDependencies(deps);
+ // Don't add initializer, because it doesn't refer to docs from the input stream.
}
return DepsTracker::State::EXHAUSTIVE_ALL;
@@ -485,16 +500,23 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and
// looking it up in '_groups' multiple times.
const size_t oldSize = _groups->size();
- vector<intrusive_ptr<Accumulator>>& group = (*_groups)[id];
+ vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id];
const bool inserted = _groups->size() != oldSize;
if (inserted) {
_memoryUsageBytes += id.getApproximateSize();
- // Add the accumulators
+ // Initialize and add the accumulators
+ Value expandedId = expandId(id);
+ Document idDoc =
+ expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
group.reserve(numAccumulators);
for (auto&& accumulatedField : _accumulatedFields) {
- group.push_back(accumulatedField.makeAccumulator());
+ auto accum = accumulatedField.makeAccumulator();
+ Value initializerValue =
+ accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables);
+ accum->startNewGroup(initializerValue);
+ group.push_back(accum);
}
} else {
for (auto&& groupObj : group) {
@@ -508,7 +530,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
for (size_t i = 0; i < numAccumulators; i++) {
group[i]->process(
- _accumulatedFields[i].expression->evaluate(rootDocument, &pExpCtx->variables),
+ _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables),
_doingMerge);
_memoryUsageBytes += group[i]->memUsageForSorter();
@@ -693,7 +715,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distr
// original accumulator may be collecting an expression based on a field expression or
// constant. Here, we accumulate the output of the same name from the prior group.
auto copiedAccumulatedField = accumulatedField;
- copiedAccumulatedField.expression =
+ copiedAccumulatedField.expr.argument =
ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + copiedAccumulatedField.fieldName, vps);
mergingGroup->addAccumulator(copiedAccumulatedField);
}
@@ -775,7 +797,10 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
fields.push_back(std::make_pair("_id", ExpressionFieldPath::create(pExpCtx, groupId)));
for (auto&& accumulator : _accumulatedFields) {
- fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expression));
+ fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument));
+
+ // Since we don't attempt this transformation for non-$first accumulators,
+ // the initializer should always be trivial.
}
return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields));