diff options
author | David Percy <david.percy@mongodb.com> | 2020-01-17 16:20:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-27 20:44:41 +0000 |
commit | 606fbf8eac896b0b4ed26e921b7f6bf1f73f5511 (patch) | |
tree | 4855ab6890e429ff79ffdf867d2b973361b62b00 /src/mongo/db/pipeline/document_source_group.cpp | |
parent | 5e57c0b0f7505035c37179d100fdd43ef2b6cc36 (diff) | |
download | mongo-606fbf8eac896b0b4ed26e921b7f6bf1f73f5511.tar.gz |
SERVER-45447 Add $accumulator for user-defined Javascript accumulators
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 45 |
1 files changed, 35 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 0f961f8c363..8268f0a1dd1 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -158,6 +158,17 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() { _currentId = _firstPartOfNextGroup.first; const size_t numAccumulators = _accumulatedFields.size(); + + // Call startNewGroup on every accumulator. + Value expandedId = expandId(_currentId); + Document idDoc = + expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); + for (size_t i = 0; i < numAccumulators; ++i) { + Value initializerValue = + _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables); + _currentAccumulators[i]->startNewGroup(initializerValue); + } + while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) { // Inside of this loop, _firstPartOfNextGroup is the current data being processed. // At loop exit, it is the first value to be processed in the next group. @@ -216,7 +227,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { } for (auto&& accumulatedField : _accumulatedFields) { - accumulatedField.expression = accumulatedField.expression->optimize(); + accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize(); + accumulatedField.expr.argument = accumulatedField.expr.argument->optimize(); } return this; @@ -241,9 +253,11 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> // Add the remaining fields. for (auto&& accumulatedField : _accumulatedFields) { - intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(); + intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator(); insides[accumulatedField.fieldName] = - Value(accum->serialize(accumulatedField.expression, static_cast<bool>(explain))); + Value(accum->serialize(accumulatedField.expr.initializer, + accumulatedField.expr.argument, + static_cast<bool>(explain))); } if (_doingMerge) { @@ -263,7 +277,8 @@ DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const // add the rest for (auto&& accumulatedField : _accumulatedFields) { - accumulatedField.expression->addDependencies(deps); + accumulatedField.expr.argument->addDependencies(deps); + // Don't add initializer, because it doesn't refer to docs from the input stream. } return DepsTracker::State::EXHAUSTIVE_ALL; @@ -485,16 +500,23 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and // looking it up in '_groups' multiple times. const size_t oldSize = _groups->size(); - vector<intrusive_ptr<Accumulator>>& group = (*_groups)[id]; + vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id]; const bool inserted = _groups->size() != oldSize; if (inserted) { _memoryUsageBytes += id.getApproximateSize(); - // Add the accumulators + // Initialize and add the accumulators + Value expandedId = expandId(id); + Document idDoc = + expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); group.reserve(numAccumulators); for (auto&& accumulatedField : _accumulatedFields) { - group.push_back(accumulatedField.makeAccumulator()); + auto accum = accumulatedField.makeAccumulator(); + Value initializerValue = + accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables); + accum->startNewGroup(initializerValue); + group.push_back(accum); } } else { for (auto&& groupObj : group) { @@ -508,7 +530,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { for (size_t i = 0; i < numAccumulators; i++) { group[i]->process( - _accumulatedFields[i].expression->evaluate(rootDocument, &pExpCtx->variables), + _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables), _doingMerge); _memoryUsageBytes += group[i]->memUsageForSorter(); @@ -693,7 +715,7 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distr // original accumulator may be collecting an expression based on a field expression or // constant. Here, we accumulate the output of the same name from the prior group. auto copiedAccumulatedField = accumulatedField; - copiedAccumulatedField.expression = + copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + copiedAccumulatedField.fieldName, vps); mergingGroup->addAccumulator(copiedAccumulatedField); } @@ -775,7 +797,10 @@ DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { fields.push_back(std::make_pair("_id", ExpressionFieldPath::create(pExpCtx, groupId))); for (auto&& accumulator : _accumulatedFields) { - fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expression)); + fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument)); + + // Since we don't attempt this transformation for non-$first accumulators, + // the initializer should always be trivial. } return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields)); |