diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/accumulation_statement.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulation_statement.h | 21 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 86 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_test.cpp | 16 |
7 files changed, 97 insertions, 106 deletions
diff --git a/src/mongo/db/pipeline/accumulation_statement.cpp b/src/mongo/db/pipeline/accumulation_statement.cpp index 9ac394b0018..5d06435911f 100644 --- a/src/mongo/db/pipeline/accumulation_statement.cpp +++ b/src/mongo/db/pipeline/accumulation_statement.cpp @@ -63,6 +63,11 @@ Accumulator::Factory AccumulationStatement::getFactory(StringData name) { return it->second; } +boost::intrusive_ptr<Accumulator> AccumulationStatement::makeAccumulator( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const { + return _factory(expCtx); +} + AccumulationStatement AccumulationStatement::parseAccumulationStatement( const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONElement& elem, @@ -92,8 +97,8 @@ AccumulationStatement AccumulationStatement::parseAccumulationStatement( specElem.type() != BSONType::Array); return {fieldName.toString(), - AccumulationStatement::getFactory(accName), - Expression::parseOperand(expCtx, specElem, vps)}; + Expression::parseOperand(expCtx, specElem, vps), + AccumulationStatement::getFactory(accName)}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulation_statement.h b/src/mongo/db/pipeline/accumulation_statement.h index 92f7906ed9e..91a58e3a8e2 100644 --- a/src/mongo/db/pipeline/accumulation_statement.h +++ b/src/mongo/db/pipeline/accumulation_statement.h @@ -54,17 +54,15 @@ namespace mongo { * A class representing a user-specified accummulation, including the field name to put the * accumulated result in, which accumulator to use, and the expression used to obtain the input to * the Accumulator. - * - * TODO SERVER-25980: Do not expose 'factory', instead add a makeAccumulator() method. */ class AccumulationStatement { public: AccumulationStatement(std::string fieldName, - Accumulator::Factory factory, - boost::intrusive_ptr<Expression> expression) + boost::intrusive_ptr<Expression> expression, + Accumulator::Factory factory) : fieldName(std::move(fieldName)), - factory(std::move(factory)), - expression(std::move(expression)) {} + expression(std::move(expression)), + _factory(std::move(factory)) {} /** * Parses a BSONElement that is an accumulated field, and returns an AccumulationStatement for @@ -93,9 +91,18 @@ public: */ static Accumulator::Factory getFactory(StringData name); + // The field name is used to store the results of the accumulation in a result document. std::string fieldName; - Accumulator::Factory factory; + + // The expression to use to obtain the input to the accumulator. boost::intrusive_ptr<Expression> expression; + + // Constructs an Accumulator to do actual accumulation. + boost::intrusive_ptr<Accumulator> makeAccumulator( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const; + +private: + Accumulator::Factory _factory; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 3bd8ecc837f..e1385b19c68 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -77,8 +77,8 @@ DocumentSource::GetDepsReturn DocumentSourceBucketAuto::getDependencies(DepsTrac _groupByExpression->addDependencies(deps); // Add the 'output' fields. - for (auto&& exp : _expressions) { - exp->addDependencies(deps); + for (auto&& accumulatedField : _accumulatedFields) { + accumulatedField.expression->addDependencies(deps); } // We know exactly which fields will be present in the output document. Future stages cannot @@ -149,9 +149,9 @@ void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>& invariant(pExpCtx->getValueComparator().evaluate(entry.first >= bucket._max)); bucket._max = entry.first; - const size_t numAccumulators = _accumulatorFactories.size(); + const size_t numAccumulators = _accumulatedFields.size(); for (size_t k = 0; k < numAccumulators; k++) { - bucket._accums[k]->process(_expressions[k]->evaluate(entry.second), false); + bucket._accums[k]->process(_accumulatedFields[k].expression->evaluate(entry.second), false); } } @@ -194,8 +194,7 @@ void DocumentSourceBucketAuto::populateBuckets() { } // Initialize the current bucket. - Bucket currentBucket( - pExpCtx, currentValue.first, currentValue.first, _accumulatorFactories); + Bucket currentBucket(pExpCtx, currentValue.first, currentValue.first, _accumulatedFields); // Add the first value into the current bucket. addDocumentToBucket(currentValue, currentBucket); @@ -266,14 +265,15 @@ void DocumentSourceBucketAuto::populateBuckets() { } } -DocumentSourceBucketAuto::Bucket::Bucket(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Value min, - Value max, - vector<Accumulator::Factory> accumulatorFactories) +DocumentSourceBucketAuto::Bucket::Bucket( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Value min, + Value max, + const vector<AccumulationStatement>& accumulationStatements) : _min(min), _max(max) { - _accums.reserve(accumulatorFactories.size()); - for (auto&& factory : accumulatorFactories) { - _accums.push_back(factory(expCtx)); + _accums.reserve(accumulationStatements.size()); + for (auto&& accumulationStatement : accumulationStatements) { + _accums.push_back(accumulationStatement.makeAccumulator(expCtx)); } } @@ -309,7 +309,7 @@ void DocumentSourceBucketAuto::addBucket(Bucket& newBucket) { } Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) { - const size_t nAccumulatedFields = _fieldNames.size(); + const size_t nAccumulatedFields = _accumulatedFields.size(); MutableDocument out(1 + nAccumulatedFields); out.addField("_id", Value{Document{{"min", bucket._min}, {"max", bucket._max}}}); @@ -320,7 +320,8 @@ Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) { // To be consistent with the $group stage, we consider "missing" to be equivalent to null // when evaluating accumulators. - out.addField(_fieldNames[i], val.missing() ? Value(BSONNULL) : std::move(val)); + out.addField(_accumulatedFields[i].fieldName, + val.missing() ? Value(BSONNULL) : std::move(val)); } return out.freeze(); } @@ -341,12 +342,12 @@ Value DocumentSourceBucketAuto::serialize( insides["granularity"] = Value(_granularityRounder->getName()); } - const size_t nOutputFields = _fieldNames.size(); - MutableDocument outputSpec(nOutputFields); - for (size_t i = 0; i < nOutputFields; i++) { - intrusive_ptr<Accumulator> accum = _accumulatorFactories[i](pExpCtx); - outputSpec[_fieldNames[i]] = Value{ - Document{{accum->getOpName(), _expressions[i]->serialize(static_cast<bool>(explain))}}}; + MutableDocument outputSpec(_accumulatedFields.size()); + for (auto&& accumulatedField : _accumulatedFields) { + intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(pExpCtx); + outputSpec[accumulatedField.fieldName] = + Value{Document{{accum->getOpName(), + accumulatedField.expression->serialize(static_cast<bool>(explain))}}}; } insides["output"] = outputSpec.freezeToValue(); @@ -367,8 +368,8 @@ intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create( // If there is no output field specified, then add the default one. if (accumulationStatements.empty()) { accumulationStatements.emplace_back("count", - AccumulationStatement::getFactory("$sum"), - ExpressionConstant::create(pExpCtx, Value(1))); + ExpressionConstant::create(pExpCtx, Value(1)), + AccumulationStatement::getFactory("$sum")); } return new DocumentSourceBucketAuto(pExpCtx, groupByExpression, @@ -393,9 +394,7 @@ DocumentSourceBucketAuto::DocumentSourceBucketAuto( invariant(!accumulationStatements.empty()); for (auto&& accumulationStatement : accumulationStatements) { - _fieldNames.push_back(std::move(accumulationStatement.fieldName)); - _accumulatorFactories.push_back(accumulationStatement.factory); - _expressions.push_back(accumulationStatement.expression); + _accumulatedFields.push_back(accumulationStatement); } } diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index a60bf5cb752..2a40b092c1f 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -96,7 +96,7 @@ private: Bucket(const boost::intrusive_ptr<ExpressionContext>& expCtx, Value min, Value max, - std::vector<Accumulator::Factory> accumulatorFactories); + const std::vector<AccumulationStatement>& accumulationStatements); Value _min; Value _max; std::vector<boost::intrusive_ptr<Accumulator>> _accums; @@ -139,13 +139,7 @@ private: std::unique_ptr<Sorter<Value, Document>> _sorter; std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput; - // _fieldNames contains the field names for the result documents, _accumulatorFactories contains - // the accumulator factories for the result documents, and _expressions contains the common - // expressions used by each instance of each accumulator in order to find the right-hand side of - // what gets added to the accumulator. These three vectors parallel each other. - std::vector<std::string> _fieldNames; - std::vector<Accumulator::Factory> _accumulatorFactories; - std::vector<boost::intrusive_ptr<Expression>> _expressions; + std::vector<AccumulationStatement> _accumulatedFields; int _nBuckets; uint64_t _maxMemoryUsageBytes; diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 8f13136f4fe..84e033b332e 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -85,7 +85,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() { return GetNextResult::makeEOF(); _currentId = _firstPartOfNextGroup.first; - const size_t numAccumulators = vpAccumulatorFactory.size(); + const size_t numAccumulators = _accumulatedFields.size(); while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) { // Inside of this loop, _firstPartOfNextGroup is the current data being processed. // At loop exit, it is the first value to be processed in the next group. @@ -140,8 +140,9 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextStreaming() { do { // Add to the current accumulator(s). for (size_t i = 0; i < _currentAccumulators.size(); i++) { - _currentAccumulators[i]->process(vpExpression[i]->evaluate(*_firstDocOfNextGroup), - _doingMerge); + _currentAccumulators[i]->process( + _accumulatedFields[i].expression->evaluate(*_firstDocOfNextGroup), _doingMerge); + } // Retrieve the next document. @@ -183,8 +184,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { _idExpressions[i] = _idExpressions[i]->optimize(); } - for (size_t i = 0; i < vFieldName.size(); i++) { - vpExpression[i] = vpExpression[i]->optimize(); + for (auto&& accumulatedField : _accumulatedFields) { + accumulatedField.expression = accumulatedField.expression->optimize(); } return this; @@ -208,11 +209,11 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> } // Add the remaining fields. - const size_t n = vFieldName.size(); - for (size_t i = 0; i < n; ++i) { - intrusive_ptr<Accumulator> accum = vpAccumulatorFactory[i](pExpCtx); - insides[vFieldName[i]] = Value( - DOC(accum->getOpName() << vpExpression[i]->serialize(static_cast<bool>(explain)))); + for (auto&& accumulatedField : _accumulatedFields) { + intrusive_ptr<Accumulator> accum = accumulatedField.makeAccumulator(pExpCtx); + insides[accumulatedField.fieldName] = + Value(DOC(accum->getOpName() + << accumulatedField.expression->serialize(static_cast<bool>(explain)))); } if (_doingMerge) { @@ -234,9 +235,8 @@ DocumentSource::GetDepsReturn DocumentSourceGroup::getDependencies(DepsTracker* } // add the rest - const size_t n = vFieldName.size(); - for (size_t i = 0; i < n; ++i) { - vpExpression[i]->addDependencies(deps); + for (auto&& accumulatedField : _accumulatedFields) { + accumulatedField.expression->addDependencies(deps); } return EXHAUSTIVE_ALL; @@ -270,9 +270,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { - vFieldName.push_back(accumulationStatement.fieldName); - vpAccumulatorFactory.push_back(accumulationStatement.factory); - vpExpression.push_back(accumulationStatement.expression); + _accumulatedFields.push_back(accumulationStatement); } namespace { @@ -452,7 +450,7 @@ void getFieldPathListForSpilled(ExpressionObject* expressionObj, } // namespace DocumentSource::GetNextResult DocumentSourceGroup::initialize() { - const size_t numAccumulators = vpAccumulatorFactory.size(); + const size_t numAccumulators = _accumulatedFields.size(); boost::optional<BSONObj> inputSort = findRelevantInputSort(); if (inputSort) { @@ -462,8 +460,8 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // Set up accumulators. _currentAccumulators.reserve(numAccumulators); - for (size_t i = 0; i < numAccumulators; i++) { - _currentAccumulators.push_back(vpAccumulatorFactory[i](pExpCtx)); + for (auto&& accumulatedField : _accumulatedFields) { + _currentAccumulators.push_back(accumulatedField.makeAccumulator(pExpCtx)); } // We only need to load the first document. @@ -480,7 +478,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { return DocumentSource::GetNextResult::makeEOF(); } - dassert(numAccumulators == vpExpression.size()); // Barring any pausing, this loop exhausts 'pSource' and populates '_groups'. GetNextResult input = pSource->getNext(); @@ -508,13 +505,13 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // Add the accumulators group.reserve(numAccumulators); - for (size_t i = 0; i < numAccumulators; i++) { - group.push_back(vpAccumulatorFactory[i](pExpCtx)); + for (auto&& accumulatedField : _accumulatedFields) { + group.push_back(accumulatedField.makeAccumulator(pExpCtx)); } } else { - for (size_t i = 0; i < numAccumulators; i++) { + for (auto&& groupObj : group) { // subtract old mem usage. New usage added back after processing. - _memoryUsageBytes -= group[i]->memUsageForSorter(); + _memoryUsageBytes -= groupObj->memUsageForSorter(); } } @@ -522,7 +519,9 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { dassert(numAccumulators == group.size()); for (size_t i = 0; i < numAccumulators; i++) { - group[i]->process(vpExpression[i]->evaluate(input.getDocument()), _doingMerge); + group[i]->process(_accumulatedFields[i].expression->evaluate(input.getDocument()), + _doingMerge); + _memoryUsageBytes += group[i]->memUsageForSorter(); } @@ -561,8 +560,8 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // prepare current to accumulate data _currentAccumulators.reserve(numAccumulators); - for (size_t i = 0; i < numAccumulators; i++) { - _currentAccumulators.push_back(vpAccumulatorFactory[i](pExpCtx)); + for (auto&& accumulatedField : _accumulatedFields) { + _currentAccumulators.push_back(accumulatedField.makeAccumulator(pExpCtx)); } verify(_sorterIterator->more()); // we put data in, we should get something out. @@ -591,8 +590,8 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator())); SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir)); - switch (vpAccumulatorFactory.size()) { // same as ptrs[i]->second.size() for all i. - case 0: // no values, essentially a distinct + switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i. + case 0: // no values, essentially a distinct for (size_t i = 0; i < ptrs.size(); i++) { writer.addAlreadySorted(ptrs[i]->first, Value()); } @@ -807,7 +806,7 @@ Value DocumentSourceGroup::expandId(const Value& val) { Document DocumentSourceGroup::makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput) { - const size_t n = vFieldName.size(); + const size_t n = _accumulatedFields.size(); MutableDocument out(1 + n); /* add the _id field */ @@ -818,9 +817,9 @@ Document DocumentSourceGroup::makeDocument(const Value& id, Value val = accums[i]->getValue(mergeableOutput); if (val.missing()) { // we return null in this case so return objects are predictable - out.addField(vFieldName[i], Value(BSONNULL)); + out.addField(_accumulatedFields[i].fieldName, Value(BSONNULL)); } else { - out.addField(vFieldName[i], val); + out.addField(_accumulatedFields[i].fieldName, val); } } @@ -839,20 +838,15 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::getMergeSource() { /* the merger will use the same grouping key */ pMerger->setIdExpression(ExpressionFieldPath::parse(pExpCtx, "$$ROOT._id", vps)); - const size_t n = vFieldName.size(); - for (size_t i = 0; i < n; ++i) { - /* - The merger's output field names will be the same, as will the - accumulator factories. However, for some accumulators, the - expression to be accumulated will be different. The original - accumulator may be collecting an expression based on a field - expression or constant. Here, we accumulate the output of the - same name from the prior group. - */ - pMerger->addAccumulator( - {vFieldName[i], - vpAccumulatorFactory[i], - ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + vFieldName[i], vps)}); + for (auto&& accumulatedField : _accumulatedFields) { + // The merger's output field names will be the same, as will the accumulator factories. + // However, for some accumulators, the expression to be accumulated will be different. The + // original accumulator may be collecting an expression based on a field expression or + // constant. Here, we accumulate the output of the same name from the prior group. + auto copiedAccumuledField = accumulatedField; + copiedAccumuledField.expression = + ExpressionFieldPath::parse(pExpCtx, "$$ROOT." + accumulatedField.fieldName, vps); + pMerger->addAccumulator(copiedAccumuledField); } return pMerger; diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 9fdcd40b373..d1c0f079234 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -148,15 +148,7 @@ private: */ Value expandId(const Value& val); - /** - * 'vFieldName' contains the field names for the result documents, 'vpAccumulatorFactory' - * contains the accumulator factories for the result documents, and 'vpExpression' contains the - * common expressions used by each instance of each accumulator in order to find the right-hand - * side of what gets added to the accumulator. These three vectors parallel each other. - */ - std::vector<std::string> vFieldName; - std::vector<Accumulator::Factory> vpAccumulatorFactory; - std::vector<boost::intrusive_ptr<Expression>> vpExpression; + std::vector<AccumulationStatement> _accumulatedFields; bool _doingMerge; size_t _memoryUsageBytes = 0; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 53234769feb..281dd32219d 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -73,8 +73,8 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoading) { expCtx->inRouter = true; // Disallow external sort. // This is the only way to do this in a debug build. AccumulationStatement countStatement{"count", - AccumulationStatement::getFactory("$sum"), - ExpressionConstant::create(expCtx, Value(1))}; + ExpressionConstant::create(expCtx, Value(1)), + AccumulationStatement::getFactory("$sum")}; auto group = DocumentSourceGroup::create( expCtx, ExpressionConstant::create(expCtx, Value(BSONNULL)), {countStatement}); auto mock = DocumentSourceMock::create({DocumentSource::GetNextResult::makePauseExecution(), @@ -108,8 +108,8 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) { VariablesParseState vps = expCtx->variablesParseState; AccumulationStatement pushStatement{"spaceHog", - AccumulationStatement::getFactory("$push"), - ExpressionFieldPath::parse(expCtx, "$largeStr", vps)}; + ExpressionFieldPath::parse(expCtx, "$largeStr", vps), + AccumulationStatement::getFactory("$push")}; auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$_id", vps); auto group = DocumentSourceGroup::create( expCtx, groupByExpression, {pushStatement}, maxMemoryUsageBytes); @@ -147,8 +147,8 @@ TEST_F(DocumentSourceGroupTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSet VariablesParseState vps = expCtx->variablesParseState; AccumulationStatement pushStatement{"spaceHog", - AccumulationStatement::getFactory("$push"), - ExpressionFieldPath::parse(expCtx, "$largeStr", vps)}; + ExpressionFieldPath::parse(expCtx, "$largeStr", vps), + AccumulationStatement::getFactory("$push")}; auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$_id", vps); auto group = DocumentSourceGroup::create( expCtx, groupByExpression, {pushStatement}, maxMemoryUsageBytes); @@ -169,8 +169,8 @@ TEST_F(DocumentSourceGroupTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { VariablesParseState vps = expCtx->variablesParseState; AccumulationStatement pushStatement{"spaceHog", - AccumulationStatement::getFactory("$push"), - ExpressionFieldPath::parse(expCtx, "$largeStr", vps)}; + ExpressionFieldPath::parse(expCtx, "$largeStr", vps), + AccumulationStatement::getFactory("$push")}; auto groupByExpression = ExpressionFieldPath::parse(expCtx, "$_id", vps); auto group = DocumentSourceGroup::create( expCtx, groupByExpression, {pushStatement}, maxMemoryUsageBytes); |