diff options
author | Ivan Fefer <ivan.fefer@mongodb.com> | 2022-11-18 10:48:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-18 11:17:40 +0000 |
commit | 4cfd9b936b68622274e39100b7859ea8eb089ad8 (patch) | |
tree | 1a2c3a89c9e6cf8fece70d2f0cc9b7a5c72e610e | |
parent | 092b2eec8182bf540d73bd77649617ad2a36300d (diff) | |
download | mongo-4cfd9b936b68622274e39100b7859ea8eb089ad8.tar.gz |
SERVER-70267 Add DocumentSourceStreamingGroup
-rw-r--r-- | jstests/aggregation/group_conversion_to_distinct_scan.js | 25 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 870 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 268 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_base.cpp | 821 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_base.h | 267 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group_test.cpp | 361 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_streaming_group.cpp | 256 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_streaming_group.h | 114 | ||||
-rw-r--r-- | src/mongo/db/pipeline/group_from_first_document_transformation.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/pipeline/group_from_first_document_transformation.h | 94 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 8 |
12 files changed, 2078 insertions, 1102 deletions
diff --git a/jstests/aggregation/group_conversion_to_distinct_scan.js b/jstests/aggregation/group_conversion_to_distinct_scan.js index 95a55fec612..ba2f55ec64b 100644 --- a/jstests/aggregation/group_conversion_to_distinct_scan.js +++ b/jstests/aggregation/group_conversion_to_distinct_scan.js @@ -42,7 +42,7 @@ function createIndexes() { } createIndexes(); -assert.commandWorked(coll.insert([ +const documents = [ {_id: 0, a: 1, b: 1, c: 1}, {_id: 1, a: 1, b: 2, c: 2}, {_id: 2, a: 1, b: 2, c: 3}, @@ -71,7 +71,8 @@ assert.commandWorked(coll.insert([ {_id: 21, str: "FoO", d: 2}, {_id: 22, str: "bar", d: 4}, {_id: 23, str: "bAr", d: 3} -])); +]; +assert.commandWorked(coll.insert(documents)); // Helper for dropping an index and removing it from the list of indexes. function removeIndex(pattern) { @@ -741,4 +742,24 @@ assertResultsMatchWithAndWithoutHintandIndexes( explain = coll.explain().aggregate(pipeline, collationOption); assert.neq(null, getAggPlanStage(explain, "DISTINCT_SCAN"), explain); assert.eq({str: 1, d: 1}, getAggPlanStage(explain, "DISTINCT_SCAN").keyPattern); + +// +// Verify that a $sort-$_internalStreamingGroup pipeline can use DISTINCT_SCAN +// +pipeline = [ + {$sort: {_id: 1}}, + {$_internalStreamingGroup: {_id: "$_id", value: {$first: "$a"}, $monotonicIdFields: ["_id"]}} +]; +const expectedResult = []; +for (let i = 0; i <= 23; i++) { + let resultDocument = {_id: i, value: null}; + if (documents[i].hasOwnProperty("a")) { + resultDocument["value"] = documents[i].a; + } + expectedResult.push(resultDocument); +} +assertResultsMatchWithAndWithoutHintandIndexes(pipeline, expectedResult); +explain = coll.explain().aggregate(pipeline); +assert.neq(null, getAggPlanStage(explain, "DISTINCT_SCAN"), explain); +assert.eq({_id: 1}, getAggPlanStage(explain, "DISTINCT_SCAN").keyPattern); }()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 522dff15844..d3f93a2da8a 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -263,6 +263,7 @@ pipelineEnv.Library( 'document_source_geo_near.cpp', 'document_source_graph_lookup.cpp', 'document_source_group.cpp', + 'document_source_group_base.cpp', 'document_source_index_stats.cpp', 'document_source_internal_all_collection_stats.cpp', 'document_source_internal_compute_geo_near_distance.cpp', @@ -296,10 +297,12 @@ pipelineEnv.Library( 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_sort_by_count.cpp', + 'document_source_streaming_group.cpp', 'document_source_tee_consumer.cpp', 'document_source_telemetry.cpp', 'document_source_union_with.cpp', 'document_source_unwind.cpp', + 'group_from_first_document_transformation.cpp', 'pipeline.cpp', 'search_helper.cpp', 'semantic_analysis.cpp', diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 13694a79a60..15d208ea252 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -46,88 +46,6 @@ namespace mongo { -namespace { - -/** - * Generates a new file name on each call using a static, atomic and monotonically increasing - * number. - * - * Each user of the Sorter must implement this function to ensure that all temporary files that the - * Sorter instances produce are uniquely identified using a unique file name extension with separate - * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple - * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. - */ -std::string nextFileName() { - static AtomicWord<unsigned> documentSourceGroupFileCounter; - return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1)); -} - -} // namespace - -using boost::intrusive_ptr; -using std::pair; -using std::shared_ptr; -using std::vector; - -Document GroupFromFirstDocumentTransformation::applyTransformation(const Document& input) { - MutableDocument output(_accumulatorExprs.size()); - - for (auto&& expr : _accumulatorExprs) { - auto value = expr.second->evaluate(input, &expr.second->getExpressionContext()->variables); - output.addField(expr.first, value.missing() ? Value(BSONNULL) : std::move(value)); - } - - return output.freeze(); -} - -void GroupFromFirstDocumentTransformation::optimize() { - for (auto&& expr : _accumulatorExprs) { - expr.second = expr.second->optimize(); - } -} - -Document GroupFromFirstDocumentTransformation::serializeTransformation( - boost::optional<ExplainOptions::Verbosity> explain) const { - - MutableDocument newRoot(_accumulatorExprs.size()); - for (auto&& expr : _accumulatorExprs) { - newRoot.addField(expr.first, expr.second->serialize(static_cast<bool>(explain))); - } - - return {{"newRoot", newRoot.freezeToValue()}}; -} - -DepsTracker::State GroupFromFirstDocumentTransformation::addDependencies(DepsTracker* deps) const { - for (auto&& expr : _accumulatorExprs) { - expression::addDependencies(expr.second.get(), deps); - } - - // This stage will replace the entire document with a new document, so any existing fields - // will be replaced and cannot be required as dependencies. We use EXHAUSTIVE_ALL here - // instead of EXHAUSTIVE_FIELDS, as in ReplaceRootTransformation, because the stages that - // follow a $group stage should not depend on document metadata. - return DepsTracker::State::EXHAUSTIVE_ALL; -} - -void GroupFromFirstDocumentTransformation::addVariableRefs(std::set<Variables::Id>* refs) const { - for (auto&& expr : _accumulatorExprs) { - expression::addVariableRefs(expr.second.get(), refs); - } -} - -DocumentSource::GetModPathsReturn GroupFromFirstDocumentTransformation::getModifiedPaths() const { - // Replaces the entire root, so all paths are modified. - return {DocumentSource::GetModPathsReturn::Type::kAllPaths, OrderedPathSet{}, {}}; -} - -std::unique_ptr<GroupFromFirstDocumentTransformation> GroupFromFirstDocumentTransformation::create( - const intrusive_ptr<ExpressionContext>& expCtx, - const std::string& groupId, - vector<pair<std::string, intrusive_ptr<Expression>>> accumulatorExprs) { - return std::make_unique<GroupFromFirstDocumentTransformation>(groupId, - std::move(accumulatorExprs)); -} - constexpr StringData DocumentSourceGroup::kStageName; REGISTER_DOCUMENT_SOURCE(group, @@ -139,527 +57,77 @@ const char* DocumentSourceGroup::getSourceName() const { return kStageName.rawData(); } -bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory() { - if (!_memoryTracker._allowDiskUse && - (_memoryTracker.currentMemoryBytes() > - static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes))) { - freeMemory(); - } - - if (_memoryTracker.currentMemoryBytes() > - static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes)) { - uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - "Exceeded memory limit for $group, but didn't allow external sort." - " Pass allowDiskUse:true to opt in.", - _memoryTracker._allowDiskUse); - _memoryTracker.resetCurrent(); - return true; - } - return false; -} - -void DocumentSourceGroup::freeMemory() { - invariant(_groups); - for (auto&& group : *_groups) { - for (size_t i = 0; i < group.second.size(); i++) { - // Subtract the current usage. - _memoryTracker.update(_accumulatedFields[i].fieldName, - -1 * group.second[i]->getMemUsage()); - - group.second[i]->reduceMemoryConsumptionIfAble(); - - // Update the memory usage for this AccumulationStatement. - _memoryTracker.update(_accumulatedFields[i].fieldName, group.second[i]->getMemUsage()); - } - } -} - -DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() { - if (!_initialized) { - const auto initializationResult = initialize(); - if (initializationResult.isPaused()) { - return initializationResult; - } - invariant(initializationResult.isEOF()); - } - - for (auto&& accum : _currentAccumulators) { - accum->reset(); // Prep accumulators for a new group. - } - - if (_spilled) { - return getNextSpilled(); - } else { - return getNextStandard(); - } -} - -DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() { - // We aren't streaming, and we have spilled to disk. - if (!_sorterIterator) - return GetNextResult::makeEOF(); - - _currentId = _firstPartOfNextGroup.first; - const size_t numAccumulators = _accumulatedFields.size(); - - // Call startNewGroup on every accumulator. - Value expandedId = expandId(_currentId); - Document idDoc = - expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); - for (size_t i = 0; i < numAccumulators; ++i) { - Value initializerValue = - _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables); - _currentAccumulators[i]->startNewGroup(initializerValue); - } - - while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) { - // Inside of this loop, _firstPartOfNextGroup is the current data being processed. - // At loop exit, it is the first value to be processed in the next group. - switch (numAccumulators) { // mirrors switch in spill() - case 1: // Single accumulators serialize as a single Value. - _currentAccumulators[0]->process(_firstPartOfNextGroup.second, true); - [[fallthrough]]; - case 0: // No accumulators so no Values. - break; - default: { // Multiple accumulators serialize as an array of Values. - const vector<Value>& accumulatorStates = _firstPartOfNextGroup.second.getArray(); - for (size_t i = 0; i < numAccumulators; i++) { - _currentAccumulators[i]->process(accumulatorStates[i], true); - } - } - } - - if (!_sorterIterator->more()) { - dispose(); - break; - } - - _firstPartOfNextGroup = _sorterIterator->next(); - } - - return makeDocument(_currentId, _currentAccumulators, pExpCtx->needsMerge); -} - -DocumentSource::GetNextResult DocumentSourceGroup::getNextStandard() { - // Not spilled, and not streaming. - if (_groups->empty()) - return GetNextResult::makeEOF(); - - Document out = makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->needsMerge); - - if (++groupsIterator == _groups->end()) - dispose(); - - return out; -} - -void DocumentSourceGroup::doDispose() { - // Free our resources. - _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); - _sorterIterator.reset(); - - // Make us look done. - groupsIterator = _groups->end(); -} - -intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { - // Optimizing a 'DocumentSourceGroup' might modify its expressions to become incompatible with - // SBE. We temporarily highjack the context's 'sbeCompatible' flag to communicate the situation - // back to the 'DocumentSourceGroup'. Notice, that while a particular 'DocumentSourceGroup' - // might become incompatible with SBE, other groups in the pipeline and the collection access - // could be still eligible for lowering to SBE, thus we must reset the context's 'sbeCompatible' - // flag back to its original value at the end of the 'optimize()' call. - // - // TODO SERVER-XXXXX: replace this hack with a proper per-stage tracking of SBE compatibility. - auto expCtx = _idExpressions[0]->getExpressionContext(); - auto orgSbeCompatible = expCtx->sbeCompatible; - expCtx->sbeCompatible = true; - - // TODO: If all _idExpressions are ExpressionConstants after optimization, then we know there - // will be only one group. We should take advantage of that to avoid going through the hash - // table. - for (size_t i = 0; i < _idExpressions.size(); i++) { - _idExpressions[i] = _idExpressions[i]->optimize(); - } - - for (auto&& accumulatedField : _accumulatedFields) { - accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize(); - accumulatedField.expr.argument = accumulatedField.expr.argument->optimize(); - } - - _sbeCompatible = _sbeCompatible && expCtx->sbeCompatible; - expCtx->sbeCompatible = orgSbeCompatible; - - return this; -} - -Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - MutableDocument insides; - - // Add the _id. - if (_idFieldNames.empty()) { - invariant(_idExpressions.size() == 1); - insides["_id"] = _idExpressions[0]->serialize(static_cast<bool>(explain)); - } else { - // Decomposed document case. - invariant(_idExpressions.size() == _idFieldNames.size()); - MutableDocument md; - for (size_t i = 0; i < _idExpressions.size(); i++) { - md[_idFieldNames[i]] = _idExpressions[i]->serialize(static_cast<bool>(explain)); - } - insides["_id"] = md.freezeToValue(); - } - - // Add the remaining fields. - for (auto&& accumulatedField : _accumulatedFields) { - intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator(); - insides[accumulatedField.fieldName] = - Value(accum->serialize(accumulatedField.expr.initializer, - accumulatedField.expr.argument, - static_cast<bool>(explain))); - } - - if (_doingMerge) { - // This makes the output unparsable (with error) on pre 2.6 shards, but it will never - // be sent to old shards when this flag is true since they can't do a merge anyway. - insides["$doingMerge"] = Value(true); - } - - MutableDocument out; - out[getSourceName()] = Value(insides.freeze()); - - if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) { - MutableDocument md; - - for (size_t i = 0; i < _accumulatedFields.size(); i++) { - md[_accumulatedFields[i].fieldName] = Value(static_cast<long long>( - _memoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes())); - } - - out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue()); - out["totalOutputDataSizeBytes"] = - Value(static_cast<long long>(_stats.totalOutputDataSizeBytes)); - out["usedDisk"] = Value(_stats.spills > 0); - out["spills"] = Value(static_cast<long long>(_stats.spills)); - } - - return Value(out.freezeToValue()); -} - -DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const { - // add the _id - for (size_t i = 0; i < _idExpressions.size(); i++) { - expression::addDependencies(_idExpressions[i].get(), deps); - } - - // add the rest - for (auto&& accumulatedField : _accumulatedFields) { - expression::addDependencies(accumulatedField.expr.argument.get(), deps); - // Don't add initializer, because it doesn't refer to docs from the input stream. - } - - return DepsTracker::State::EXHAUSTIVE_ALL; -} - -void DocumentSourceGroup::addVariableRefs(std::set<Variables::Id>* refs) const { - for (const auto& idExpr : _idExpressions) { - expression::addVariableRefs(idExpr.get(), refs); - } - - for (auto&& accumulatedField : _accumulatedFields) { - expression::addVariableRefs(accumulatedField.expr.argument.get(), refs); - } -} - -DocumentSource::GetModPathsReturn DocumentSourceGroup::getModifiedPaths() const { - // We preserve none of the fields, but any fields referenced as part of the group key are - // logically just renamed. - StringMap<std::string> renames; - for (std::size_t i = 0; i < _idExpressions.size(); ++i) { - auto idExp = _idExpressions[i]; - auto pathToPutResultOfExpression = - _idFieldNames.empty() ? "_id" : "_id." + _idFieldNames[i]; - auto computedPaths = idExp->getComputedPaths(pathToPutResultOfExpression); - for (auto&& rename : computedPaths.renames) { - renames[rename.first] = rename.second; - } - } - - return {DocumentSource::GetModPathsReturn::Type::kAllExcept, - OrderedPathSet{}, // No fields are preserved. - std::move(renames)}; -} - -StringMap<boost::intrusive_ptr<Expression>> DocumentSourceGroup::getIdFields() const { - if (_idFieldNames.empty()) { - invariant(_idExpressions.size() == 1); - return {{"_id", _idExpressions[0]}}; - } else { - invariant(_idFieldNames.size() == _idExpressions.size()); - StringMap<boost::intrusive_ptr<Expression>> result; - for (std::size_t i = 0; i < _idFieldNames.size(); ++i) { - result["_id." + _idFieldNames[i]] = _idExpressions[i]; - } - return result; - } -} - -std::vector<boost::intrusive_ptr<Expression>>& DocumentSourceGroup::getMutableIdFields() { - tassert(7020503, "Can't mutate _id fields after initialization", !_initialized); - return _idExpressions; -} - -const std::vector<AccumulationStatement>& DocumentSourceGroup::getAccumulatedFields() const { - return _accumulatedFields; -} - -std::vector<AccumulationStatement>& DocumentSourceGroup::getMutableAccumulatedFields() { - tassert(7020504, "Can't mutate accumulated fields after initialization", !_initialized); - return _accumulatedFields; -} - -intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create( - const intrusive_ptr<ExpressionContext>& expCtx, +boost::intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const boost::intrusive_ptr<Expression>& groupByExpression, std::vector<AccumulationStatement> accumulationStatements, boost::optional<size_t> maxMemoryUsageBytes) { - intrusive_ptr<DocumentSourceGroup> groupStage( + boost::intrusive_ptr<DocumentSourceGroup> groupStage( new DocumentSourceGroup(expCtx, maxMemoryUsageBytes)); groupStage->setIdExpression(groupByExpression); for (auto&& statement : accumulationStatements) { groupStage->addAccumulator(statement); - groupStage->_memoryTracker.set(statement.fieldName, 0); } return groupStage; } -DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& expCtx, +DocumentSourceGroup::DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<size_t> maxMemoryUsageBytes) - : DocumentSource(kStageName, expCtx), - _doingMerge(false), - _memoryTracker{expCtx->allowDiskUse && !expCtx->inMongos, - maxMemoryUsageBytes - ? *maxMemoryUsageBytes - : static_cast<size_t>(internalDocumentSourceGroupMaxMemoryBytes.load())}, - _initialized(false), - _groups(expCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), - _spilled(false), - _sbeCompatible(false) {} + : DocumentSourceGroupBase(kStageName, expCtx, maxMemoryUsageBytes), _groupsReady(false) {} -void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { - _accumulatedFields.push_back(accumulationStatement); +boost::intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return createFromBsonWithMaxMemoryUsage(std::move(elem), expCtx, boost::none); } -namespace { - -intrusive_ptr<Expression> parseIdExpression(const intrusive_ptr<ExpressionContext>& expCtx, - BSONElement groupField, - const VariablesParseState& vps) { - if (groupField.type() == Object) { - // {_id: {}} is treated as grouping on a constant, not an expression - if (groupField.Obj().isEmpty()) { - return ExpressionConstant::create(expCtx.get(), Value(groupField)); - } - - const BSONObj idKeyObj = groupField.Obj(); - if (idKeyObj.firstElementFieldName()[0] == '$') { - // grouping on a $op expression - return Expression::parseObject(expCtx.get(), idKeyObj, vps); - } else { - for (auto&& field : idKeyObj) { - uassert(17390, - "$group does not support inclusion-style expressions", - !field.isNumber() && field.type() != Bool); - } - return ExpressionObject::parse(expCtx.get(), idKeyObj, vps); - } - } else { - return Expression::parseOperand(expCtx.get(), groupField, vps); - } +boost::intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBsonWithMaxMemoryUsage( + BSONElement elem, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes) { + boost::intrusive_ptr<DocumentSourceGroup> groupStage( + new DocumentSourceGroup(expCtx, maxMemoryUsageBytes)); + groupStage->initializeFromBson(elem); + return groupStage; } -} // namespace - -void DocumentSourceGroup::setIdExpression(const boost::intrusive_ptr<Expression> idExpression) { - if (auto object = dynamic_cast<ExpressionObject*>(idExpression.get())) { - auto& childExpressions = object->getChildExpressions(); - invariant(!childExpressions.empty()); // We expect to have converted an empty object into a - // constant expression. - - // grouping on an "artificial" object. Rather than create the object for each input - // in initialize(), instead group on the output of the raw expressions. The artificial - // object will be created at the end in makeDocument() while outputting results. - for (auto&& childExpPair : childExpressions) { - _idFieldNames.push_back(childExpPair.first); - _idExpressions.push_back(childExpPair.second); +DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() { + if (!_groupsReady) { + const auto initializationResult = performBlockingGroup(); + if (initializationResult.isPaused()) { + return initializationResult; } - } else { - _idExpressions.push_back(idExpression); - } -} - -boost::intrusive_ptr<Expression> DocumentSourceGroup::getIdExpression() const { - // _idFieldNames is empty and _idExpressions has one element when the _id expression is not an - // object expression. - if (_idFieldNames.empty() && _idExpressions.size() == 1) { - return _idExpressions[0]; - } - - tassert(6586300, - "Field and its expression must be always paired in ExpressionObject", - _idFieldNames.size() > 0 && _idFieldNames.size() == _idExpressions.size()); - - // Each expression in '_idExpressions' may have been optimized and so, compose the object _id - // expression out of the optimized expressions. - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fieldsAndExprs; - for (size_t i = 0; i < _idExpressions.size(); ++i) { - fieldsAndExprs.emplace_back(_idFieldNames[i], _idExpressions[i]); + invariant(initializationResult.isEOF()); } - return ExpressionObject::create(_idExpressions[0]->getExpressionContext(), - std::move(fieldsAndExprs)); -} - -intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson( - BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { - uassert(15947, "a group's fields must be specified in an object", elem.type() == Object); - - intrusive_ptr<DocumentSourceGroup> groupStage(new DocumentSourceGroup(expCtx)); - - BSONObj groupObj(elem.Obj()); - BSONObjIterator groupIterator(groupObj); - VariablesParseState vps = expCtx->variablesParseState; - expCtx->sbeGroupCompatible = true; - while (groupIterator.more()) { - BSONElement groupField(groupIterator.next()); - StringData pFieldName = groupField.fieldNameStringData(); - if (pFieldName == "_id") { - uassert(15948, - "a group's _id may only be specified once", - groupStage->_idExpressions.empty()); - groupStage->setIdExpression(parseIdExpression(expCtx, groupField, vps)); - invariant(!groupStage->_idExpressions.empty()); - } else if (pFieldName == "$doingMerge") { - massert(17030, "$doingMerge should be true if present", groupField.Bool()); - - groupStage->setDoingMerge(true); - } else { - // Any other field will be treated as an accumulator specification. - groupStage->addAccumulator( - AccumulationStatement::parseAccumulationStatement(expCtx.get(), groupField, vps)); - groupStage->_memoryTracker.set(pFieldName, 0); - } + auto result = getNextReadyGroup(); + if (result.isEOF()) { + dispose(); } - groupStage->_sbeCompatible = expCtx->sbeGroupCompatible && expCtx->sbeCompatible; - - uassert( - 15955, "a group specification must include an _id", !groupStage->_idExpressions.empty()); - return groupStage; + return result; } -namespace { - -using GroupsMap = DocumentSourceGroup::GroupsMap; - -class SorterComparator { -public: - SorterComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {} - - int operator()(const Value& lhs, const Value& rhs) const { - return _valueComparator.compare(lhs, rhs); - } - -private: - ValueComparator _valueComparator; -}; - -class SpillSTLComparator { -public: - SpillSTLComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {} - - bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const { - return _valueComparator.evaluate(lhs->first < rhs->first); - } - -private: - ValueComparator _valueComparator; -}; -} // namespace - -DocumentSource::GetNextResult DocumentSourceGroup::initialize() { +DocumentSource::GetNextResult DocumentSourceGroup::performBlockingGroup() { GetNextResult input = pSource->getNext(); - return initializeSelf(input); + return performBlockingGroupSelf(input); } -// This separate NOINLINE function is used here to decrease stack utilization of initialize() and -// prevent stack overflows. -MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initializeSelf( +// This separate NOINLINE function is used here to decrease stack utilization of +// performBlockingGroup() and prevent stack overflows. +MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::performBlockingGroupSelf( GetNextResult input) { - const size_t numAccumulators = _accumulatedFields.size(); + setExecutionStarted(); // Barring any pausing, this loop exhausts 'pSource' and populates '_groups'. for (; input.isAdvanced(); input = pSource->getNext()) { if (shouldSpillWithAttemptToSaveMemory()) { - _sortedFiles.push_back(spill()); + spill(); } // We release the result document here so that it does not outlive the end of this loop // iteration. Not releasing could lead to an array copy when this group follows an unwind. auto rootDocument = input.releaseDocument(); Value id = computeId(rootDocument); - - // Look for the _id value in the map. If it's not there, add a new entry with a blank - // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and - // looking it up in '_groups' multiple times. - const size_t oldSize = _groups->size(); - vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id]; - const bool inserted = _groups->size() != oldSize; - - vector<uint64_t> oldAccumMemUsage(numAccumulators, 0); - if (inserted) { - _memoryTracker.set(_memoryTracker.currentMemoryBytes() + id.getApproximateSize()); - - // Initialize and add the accumulators - Value expandedId = expandId(id); - Document idDoc = - expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); - group.reserve(numAccumulators); - for (auto&& accumulatedField : _accumulatedFields) { - auto accum = accumulatedField.makeAccumulator(); - Value initializerValue = - accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables); - accum->startNewGroup(initializerValue); - group.push_back(accum); - } - } - - /* tickle all the accumulators for the group we found */ - dassert(numAccumulators == group.size()); - - for (size_t i = 0; i < numAccumulators; i++) { - // Only process the input and update the memory footprint if the current accumulator - // needs more input. - if (group[i]->needsInput()) { - const auto prevMemUsage = inserted ? 0 : group[i]->getMemUsage(); - group[i]->process(_accumulatedFields[i].expr.argument->evaluate( - rootDocument, &pExpCtx->variables), - _doingMerge); - _memoryTracker.update(_accumulatedFields[i].fieldName, - group[i]->getMemUsage() - prevMemUsage); - } - } - - if (kDebugBuild && !pExpCtx->opCtx->readOnly()) { - // In debug mode, spill every time we have a duplicate id to stress merge logic. - if (!inserted && // is a dup - !pExpCtx->inMongos && // can't spill to disk in mongos - !_memoryTracker - ._allowDiskUse && // don't change behavior when testing external sort - _sortedFiles.size() < 20) { // don't open too many FDs - - _sortedFiles.push_back(spill()); - } - } + processDocument(id, rootDocument); } switch (input.getStatus()) { @@ -670,280 +138,14 @@ MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initi return input; // Propagate pause. } case DocumentSource::GetNextResult::ReturnStatus::kEOF: { - // Do any final steps necessary to prepare to output results. - if (!_sortedFiles.empty()) { - _spilled = true; - if (!_groups->empty()) { - _sortedFiles.push_back(spill()); - } - - // We won't be using groups again so free its memory. - _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); - - _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( - _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); - - // prepare current to accumulate data - _currentAccumulators.reserve(numAccumulators); - for (auto&& accumulatedField : _accumulatedFields) { - _currentAccumulators.push_back(accumulatedField.makeAccumulator()); - } - - verify(_sorterIterator->more()); // we put data in, we should get something out. - _firstPartOfNextGroup = _sorterIterator->next(); - } else { - // start the group iterator - groupsIterator = _groups->begin(); - } - + readyGroups(); // This must happen last so that, unless control gets here, we will re-enter // initialization after getting a GetNextResult::ResultState::kPauseExecution. - _initialized = true; + _groupsReady = true; return input; } } MONGO_UNREACHABLE; } -shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { - _stats.spills++; - - vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting - ptrs.reserve(_groups->size()); - for (GroupsMap::const_iterator it = _groups->begin(), end = _groups->end(); it != end; ++it) { - ptrs.push_back(&*it); - } - - stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator())); - - // Initialize '_file' in a lazy manner only when it is needed. - if (!_file) { - _file = - std::make_shared<Sorter<Value, Value>::File>(pExpCtx->tempDir + "/" + nextFileName()); - } - SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file); - switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i. - case 0: // no values, essentially a distinct - for (size_t i = 0; i < ptrs.size(); i++) { - writer.addAlreadySorted(ptrs[i]->first, Value()); - } - break; - - case 1: // just one value, use optimized serialization as single Value - for (size_t i = 0; i < ptrs.size(); i++) { - writer.addAlreadySorted(ptrs[i]->first, - ptrs[i]->second[0]->getValue(/*toBeMerged=*/true)); - } - break; - - default: // multiple values, serialize as array-typed Value - for (size_t i = 0; i < ptrs.size(); i++) { - vector<Value> accums; - for (size_t j = 0; j < ptrs[i]->second.size(); j++) { - accums.push_back(ptrs[i]->second[j]->getValue(/*toBeMerged=*/true)); - } - writer.addAlreadySorted(ptrs[i]->first, Value(std::move(accums))); - } - break; - } - - auto& metricsCollector = ResourceConsumption::MetricsCollector::get(pExpCtx->opCtx); - metricsCollector.incrementKeysSorted(ptrs.size()); - metricsCollector.incrementSorterSpills(1); - - _groups->clear(); - // Zero out the current per-accumulation statement memory consumption, as the memory has been - // freed by spilling. - for (const auto& accum : _accumulatedFields) { - _memoryTracker.set(accum.fieldName, 0); - } - - Sorter<Value, Value>::Iterator* iteratorPtr = writer.done(); - return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr); -} - -Value DocumentSourceGroup::computeId(const Document& root) { - // If only one expression, return result directly - if (_idExpressions.size() == 1) { - Value retValue = _idExpressions[0]->evaluate(root, &pExpCtx->variables); - return retValue.missing() ? Value(BSONNULL) : std::move(retValue); - } - - // Multiple expressions get results wrapped in a vector - vector<Value> vals; - vals.reserve(_idExpressions.size()); - for (size_t i = 0; i < _idExpressions.size(); i++) { - vals.push_back(_idExpressions[i]->evaluate(root, &pExpCtx->variables)); - } - return Value(std::move(vals)); -} - -Value DocumentSourceGroup::expandId(const Value& val) { - // _id doesn't get wrapped in a document - if (_idFieldNames.empty()) - return val; - - // _id is a single-field document containing val - if (_idFieldNames.size() == 1) - return Value(DOC(_idFieldNames[0] << val)); - - // _id is a multi-field document containing the elements of val - const vector<Value>& vals = val.getArray(); - invariant(_idFieldNames.size() == vals.size()); - MutableDocument md(vals.size()); - for (size_t i = 0; i < vals.size(); i++) { - md[_idFieldNames[i]] = vals[i]; - } - return md.freezeToValue(); -} - -Document DocumentSourceGroup::makeDocument(const Value& id, - const Accumulators& accums, - bool mergeableOutput) { - const size_t n = _accumulatedFields.size(); - MutableDocument out(1 + n); - - /* add the _id field */ - out.addField("_id", expandId(id)); - - /* add the rest of the fields */ - for (size_t i = 0; i < n; ++i) { - Value val = accums[i]->getValue(mergeableOutput); - if (val.missing()) { - // we return null in this case so return objects are predictable - out.addField(_accumulatedFields[i].fieldName, Value(BSONNULL)); - } else { - out.addField(_accumulatedFields[i].fieldName, std::move(val)); - } - } - - _stats.totalOutputDataSizeBytes += out.getApproximateSize(); - return out.freeze(); -} - -boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distributedPlanLogic() { - intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx)); - mergingGroup->setDoingMerge(true); - - VariablesParseState vps = pExpCtx->variablesParseState; - /* the merger will use the same grouping key */ - mergingGroup->setIdExpression(ExpressionFieldPath::parse(pExpCtx.get(), "$$ROOT._id", vps)); - - for (auto&& accumulatedField : _accumulatedFields) { - // The merger's output field names will be the same, as will the accumulator factories. - // However, for some accumulators, the expression to be accumulated will be different. The - // original accumulator may be collecting an expression based on a field expression or - // constant. Here, we accumulate the output of the same name from the prior group. - auto copiedAccumulatedField = accumulatedField; - copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse( - pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps); - mergingGroup->addAccumulator(copiedAccumulatedField); - mergingGroup->_memoryTracker.set(copiedAccumulatedField.fieldName, 0); - } - - // {shardsStage, mergingStage, sortPattern} - return DistributedPlanLogic{this, mergingGroup, boost::none}; -} - -bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const { - return std::any_of( - _idExpressions.begin(), _idExpressions.end(), [&dottedPath](const auto& exp) { - if (auto fieldExp = dynamic_cast<ExpressionFieldPath*>(exp.get())) { - if (fieldExp->representsPath(dottedPath)) { - return true; - } - } - return false; - }); -} - -bool DocumentSourceGroup::canRunInParallelBeforeWriteStage( - const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const { - if (_doingMerge) { - return true; // This is fine. - } - - // Certain $group stages are allowed to execute on each exchange consumer. In order to - // guarantee each consumer will only group together data from its own shard, the $group must - // group on a superset of the shard key. - for (auto&& currentPathOfShardKey : nameOfShardKeyFieldsUponEntryToStage) { - if (!pathIncludedInGroupKeys(currentPathOfShardKey)) { - // This requires an exact path match, but as a future optimization certain path - // prefixes should be okay. For example, if the shard key path is "a.b", and we're - // grouping by "a", then each group of "a" is strictly more specific than "a.b", so - // we can deduce that grouping by "a" will not need to group together documents - // across different values of the shard key field "a.b", and thus as long as any - // other shard key fields are similarly preserved will not need to consume a merged - // stream to perform the group. - return false; - } - } - return true; -} - -std::unique_ptr<GroupFromFirstDocumentTransformation> -DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const { - if (_idExpressions.size() != 1) { - // This transformation is only intended for $group stages that group on a single field. - return nullptr; - } - - auto fieldPathExpr = dynamic_cast<ExpressionFieldPath*>(_idExpressions.front().get()); - if (!fieldPathExpr || fieldPathExpr->isVariableReference()) { - return nullptr; - } - - const auto fieldPath = fieldPathExpr->getFieldPath(); - if (fieldPath.getPathLength() == 1) { - // The path is $$CURRENT or $$ROOT. This isn't really a sensible value to group by (since - // each document has a unique _id, it will just return the entire collection). We only - // apply the rewrite when grouping by a single field, so we cannot apply it in this case, - // where we are grouping by the entire document. - tassert(5943200, - "Optimization attempted on group by always-dissimilar system variable", - fieldPath.getFieldName(0) == "CURRENT" || fieldPath.getFieldName(0) == "ROOT"); - return nullptr; - } - - const auto groupId = fieldPath.tail().fullPath(); - - // We can't do this transformation if there are any non-$first accumulators. - for (auto&& accumulator : _accumulatedFields) { - if (AccumulatorDocumentsNeeded::kFirstDocument != - accumulator.makeAccumulator()->documentsNeeded()) { - return nullptr; - } - } - - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fields; - - boost::intrusive_ptr<Expression> idField; - // The _id field can be specified either as a fieldpath (ex. _id: "$a") or as a singleton - // object (ex. _id: {v: "$a"}). - if (_idFieldNames.empty()) { - idField = ExpressionFieldPath::deprecatedCreate(pExpCtx.get(), groupId); - } else { - invariant(_idFieldNames.size() == 1); - idField = ExpressionObject::create(pExpCtx.get(), - {{_idFieldNames.front(), _idExpressions.front()}}); - } - fields.push_back(std::make_pair("_id", idField)); - - for (auto&& accumulator : _accumulatedFields) { - fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument)); - - // Since we don't attempt this transformation for non-$first accumulators, - // the initializer should always be trivial. - } - - return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields)); -} - -size_t DocumentSourceGroup::getMaxMemoryUsageBytes() const { - return _memoryTracker._maxAllowedMemoryUsageBytes; -} - } // namespace mongo - -#include "mongo/db/sorter/sorter.cpp" -// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 2a3ac2ea89c..a1f8e9b2b9a 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -32,90 +32,19 @@ #include <memory> #include <utility> -#include "mongo/db/pipeline/accumulation_statement.h" -#include "mongo/db/pipeline/accumulator.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/memory_usage_tracker.h" -#include "mongo/db/pipeline/transformer_interface.h" -#include "mongo/db/sorter/sorter.h" +#include "mongo/db/pipeline/document_source_group_base.h" namespace mongo { /** - * GroupFromFirstTransformation consists of a list of (field name, expression pairs). It returns a - * document synthesized by assigning each field name in the output document to the result of - * evaluating the corresponding expression. If the expression evaluates to missing, we assign a - * value of BSONNULL. This is necessary to match the semantics of $first for missing fields. + * This class represents hash based group implementation that stores all groups until source is + * depleted and only then starts outputing documents. */ -class GroupFromFirstDocumentTransformation final : public TransformerInterface { +class DocumentSourceGroup final : public DocumentSourceGroupBase { public: - GroupFromFirstDocumentTransformation( - const std::string& groupId, - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs) - : _accumulatorExprs(std::move(accumulatorExprs)), _groupId(groupId) {} - - TransformerType getType() const final { - return TransformerType::kGroupFromFirstDocument; - } - - /** - * The path of the field that we are grouping on: i.e., the field in the input document that we - * will use to create the _id field of the ouptut document. - */ - const std::string& groupId() const { - return _groupId; - } - - Document applyTransformation(const Document& input) final; - - void optimize() final; - - Document serializeTransformation( - boost::optional<ExplainOptions::Verbosity> explain) const final; - - DepsTracker::State addDependencies(DepsTracker* deps) const final; - - void addVariableRefs(std::set<Variables::Id>* refs) const final; - - DocumentSource::GetModPathsReturn getModifiedPaths() const final; - - static std::unique_ptr<GroupFromFirstDocumentTransformation> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const std::string& groupId, - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs); - -private: - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> _accumulatorExprs; - std::string _groupId; -}; - -class DocumentSourceGroup final : public DocumentSource { -public: - using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>; - using GroupsMap = ValueUnorderedMap<Accumulators>; - static constexpr StringData kStageName = "$group"_sd; - boost::intrusive_ptr<DocumentSource> optimize() final; - DepsTracker::State getDependencies(DepsTracker* deps) const final; - void addVariableRefs(std::set<Variables::Id>* refs) const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; const char* getSourceName() const final; - GetModPathsReturn getModifiedPaths() const final; - StringMap<boost::intrusive_ptr<Expression>> getIdFields() const; - - /** - * Can be used to change or swap out individual _id fields, but should not be used - * once execution has begun. - */ - std::vector<boost::intrusive_ptr<Expression>>& getMutableIdFields(); - const std::vector<AccumulationStatement>& getAccumulatedFields() const; - - /** - * Can be used to change or swap out individual accumulated fields, but should not be used - * once execution has begun. - */ - std::vector<AccumulationStatement>& getMutableAccumulatedFields(); /** * Convenience method for creating a new $group stage. If maxMemoryUsageBytes is boost::none, @@ -133,199 +62,40 @@ public: */ static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - StageConstraints constraints(StreamType::kBlocking, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed, - TransactionRequirement::kAllowed, - LookupRequirement::kAllowed, - UnionRequirement::kAllowed); - constraints.canSwapWithMatch = true; - return constraints; - } - - /** - * Add an accumulator, which will become a field in each Document that results from grouping. - */ - void addAccumulator(AccumulationStatement accumulationStatement); - - /** - * Sets the expression to use to determine the group id of each document. - */ - void setIdExpression(boost::intrusive_ptr<Expression> idExpression); - - /** - * Returns the expression to use to determine the group id of each document. - */ - boost::intrusive_ptr<Expression> getIdExpression() const; - - /** - * Returns true if this $group stage represents a 'global' $group which is merging together - * results from earlier partial groups. - */ - bool doingMerge() const { - return _doingMerge; - } - - /** - * Tell this source if it is doing a merge from shards. Defaults to false. - */ - void setDoingMerge(bool doingMerge) { - _doingMerge = doingMerge; - } - - /** - * Returns true if this $group stage used disk during execution and false otherwise. - */ - bool usedDisk() final { - return _stats.spills > 0; - } - - const SpecificStats* getSpecificStats() const final { - return &_stats; - } - - boost::optional<DistributedPlanLogic> distributedPlanLogic() final; - bool canRunInParallelBeforeWriteStage( - const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const final; - - /** - * When possible, creates a document transformer that transforms the first document in a group - * into one of the output documents of the $group stage. This is possible when we are grouping - * on a single field and all accumulators are $first (or there are no accumluators). - * - * It is sometimes possible to use a DISTINCT_SCAN to scan the first document of each group, - * in which case this transformation can replace the actual $group stage in the pipeline - * (SERVER-9507). - */ - std::unique_ptr<GroupFromFirstDocumentTransformation> rewriteGroupAsTransformOnFirstDocument() - const; - - /** - * Returns maximum allowed memory footprint. - */ - size_t getMaxMemoryUsageBytes() const; - - // True if this $group can be pushed down to SBE. - bool sbeCompatible() const { - return _sbeCompatible; - } + static boost::intrusive_ptr<DocumentSource> createFromBsonWithMaxMemoryUsage( + BSONElement elem, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes); protected: GetNextResult doGetNext() final; - void doDispose() final; + + bool isSpecFieldReserved(StringData) final { + return false; + } private: explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<size_t> maxMemoryUsageBytes = boost::none); /** - * getNext() dispatches to one of these three depending on what type of $group it is. These - * methods expect '_currentAccumulators' to have been reset before being called, and also expect - * initialize() to have been called already. - */ - GetNextResult getNextSpilled(); - GetNextResult getNextStandard(); - - /** - * Before returning anything, this source must prepare itself. In a streaming $group, - * initialize() requests the first document from the previous source, and uses it to prepare the - * accumulators. In an unsorted $group, initialize() exhausts the previous source before - * returning. The '_initialized' boolean indicates that initialize() has finished. + * Before returning anything, this source must prepare itself. performBlockingGroup() exhausts + * the previous source before + * returning. The '_groupsReady' boolean indicates that performBlockingGroup() has finished. * * This method may not be able to finish initialization in a single call if 'pSource' returns a * DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult * encountered, which may be either kEOF or kPauseExecution. */ - GetNextResult initialize(); + GetNextResult performBlockingGroup(); /** - * Initializes this $group after any children are potentially initialized see initialize() for + * Initializes this $group after any children are initialized. See performBlockingGroup() for * more details. */ - GetNextResult initializeSelf(GetNextResult input); - - /** - * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group - * does not exhaust the previous stage before returning, and thus does not maintain as large a - * store of documents at any one time, only an unsorted group can spill to disk. - */ - std::shared_ptr<Sorter<Value, Value>::Iterator> spill(); - - /** - * If we ran out of memory, finish all the pending operations so that some memory - * can be freed. - */ - void freeMemory(); - - Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); - - /** - * Computes the internal representation of the group key. - */ - Value computeId(const Document& root); - - /** - * Converts the internal representation of the group key to the _id shape specified by the - * user. - */ - Value expandId(const Value& val); - - /** - * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'. - */ - bool pathIncludedInGroupKeys(const std::string& dottedPath) const; - - /** - * Cleans up any pending memory usage. Throws error, if memory usage is above - * 'maxMemoryUsageBytes' and cannot spill to disk. - * - * Returns true, if the caller should spill to disk, false otherwise. - */ - bool shouldSpillWithAttemptToSaveMemory(); - - std::vector<AccumulationStatement> _accumulatedFields; - - bool _doingMerge; - - MemoryUsageTracker _memoryTracker; - - GroupStats _stats; - - std::shared_ptr<Sorter<Value, Value>::File> _file; - - // If the expression for the '_id' field represents a non-empty object, we track its fields' - // names in '_idFieldNames'. - std::vector<std::string> _idFieldNames; - // Expressions for the individual fields when '_id' produces a document in the order of - // '_idFieldNames' or the whole expression otherwise. - std::vector<boost::intrusive_ptr<Expression>> _idExpressions; - - bool _initialized; - - Value _currentId; - Accumulators _currentAccumulators; - - // We use boost::optional to defer initialization until the ExpressionContext containing the - // correct comparator is injected, since the groups must be built using the comparator's - // definition of equality. - boost::optional<GroupsMap> _groups; - - std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles; - bool _spilled; - - // Only used when '_spilled' is false. - GroupsMap::iterator groupsIterator; - - // Only used when '_spilled' is true. - std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; - - std::pair<Value, Value> _firstPartOfNextGroup; + GetNextResult performBlockingGroupSelf(GetNextResult input); - bool _sbeCompatible; + bool _groupsReady; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_group_base.cpp b/src/mongo/db/pipeline/document_source_group_base.cpp new file mode 100644 index 00000000000..aea2b47084a --- /dev/null +++ b/src/mongo/db/pipeline/document_source_group_base.cpp @@ -0,0 +1,821 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/exec/document_value/value_comparator.h" +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_group_base.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/expression_dependencies.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/stats/resource_consumption_metrics.h" +#include "mongo/util/destructor_guard.h" + +namespace mongo { + +namespace { + +/** + * Generates a new file name on each call using a static, atomic and monotonically increasing + * number. + * + * Each user of the Sorter must implement this function to ensure that all temporary files that the + * Sorter instances produce are uniquely identified using a unique file name extension with separate + * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple + * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID. + */ +std::string nextFileName() { + static AtomicWord<unsigned> documentSourceGroupFileCounter; + return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1)); +} + +} // namespace + +using boost::intrusive_ptr; +using std::pair; +using std::shared_ptr; +using std::vector; + +Value DocumentSourceGroupBase::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { + MutableDocument insides; + + // Add the _id. + if (_idFieldNames.empty()) { + invariant(_idExpressions.size() == 1); + insides["_id"] = _idExpressions[0]->serialize(static_cast<bool>(explain)); + } else { + // Decomposed document case. + invariant(_idExpressions.size() == _idFieldNames.size()); + MutableDocument md; + for (size_t i = 0; i < _idExpressions.size(); i++) { + md[_idFieldNames[i]] = _idExpressions[i]->serialize(static_cast<bool>(explain)); + } + insides["_id"] = md.freezeToValue(); + } + + // Add the remaining fields. + for (auto&& accumulatedField : _accumulatedFields) { + intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator(); + insides[accumulatedField.fieldName] = + Value(accum->serialize(accumulatedField.expr.initializer, + accumulatedField.expr.argument, + static_cast<bool>(explain))); + } + + if (_doingMerge) { + insides["$doingMerge"] = Value(true); + } + + serializeAdditionalFields(insides, explain); + + MutableDocument out; + out[getSourceName()] = insides.freezeToValue(); + + if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) { + MutableDocument md; + + for (size_t i = 0; i < _accumulatedFields.size(); i++) { + md[_accumulatedFields[i].fieldName] = Value(static_cast<long long>( + _memoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes())); + } + + out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue()); + out["totalOutputDataSizeBytes"] = + Value(static_cast<long long>(_stats.totalOutputDataSizeBytes)); + out["usedDisk"] = Value(_stats.spills > 0); + out["spills"] = Value(static_cast<long long>(_stats.spills)); + } + + return out.freezeToValue(); +} + + +bool DocumentSourceGroupBase::shouldSpillWithAttemptToSaveMemory() { + if (!_memoryTracker._allowDiskUse && + (_memoryTracker.currentMemoryBytes() > + static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes))) { + freeMemory(); + } + + if (_memoryTracker.currentMemoryBytes() > + static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes)) { + uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + "Exceeded memory limit for $group, but didn't allow external sort." + " Pass allowDiskUse:true to opt in.", + _memoryTracker._allowDiskUse); + _memoryTracker.resetCurrent(); + return true; + } + return false; +} + +void DocumentSourceGroupBase::freeMemory() { + invariant(_groups); + for (auto&& group : *_groups) { + for (size_t i = 0; i < group.second.size(); i++) { + // Subtract the current usage. + _memoryTracker.update(_accumulatedFields[i].fieldName, + -1 * group.second[i]->getMemUsage()); + + group.second[i]->reduceMemoryConsumptionIfAble(); + + // Update the memory usage for this AccumulationStatement. + _memoryTracker.update(_accumulatedFields[i].fieldName, group.second[i]->getMemUsage()); + } + } +} + +DocumentSource::GetNextResult DocumentSourceGroupBase::getNextReadyGroup() { + if (_spilled) { + return getNextSpilled(); + } else { + return getNextStandard(); + } +} + +DocumentSource::GetNextResult DocumentSourceGroupBase::getNextSpilled() { + // We aren't streaming, and we have spilled to disk. + if (!_sorterIterator) + return GetNextResult::makeEOF(); + + Value currentId = _firstPartOfNextGroup.first; + const size_t numAccumulators = _accumulatedFields.size(); + + // Call startNewGroup on every accumulator. + Value expandedId = expandId(currentId); + Document idDoc = + expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); + for (size_t i = 0; i < numAccumulators; ++i) { + Value initializerValue = + _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables); + _currentAccumulators[i]->reset(); + _currentAccumulators[i]->startNewGroup(initializerValue); + } + + while (pExpCtx->getValueComparator().evaluate(currentId == _firstPartOfNextGroup.first)) { + // Inside of this loop, _firstPartOfNextGroup is the current data being processed. + // At loop exit, it is the first value to be processed in the next group. + switch (numAccumulators) { // mirrors switch in spill() + case 1: // Single accumulators serialize as a single Value. + _currentAccumulators[0]->process(_firstPartOfNextGroup.second, true); + [[fallthrough]]; + case 0: // No accumulators so no Values. + break; + default: { // Multiple accumulators serialize as an array of Values. + const vector<Value>& accumulatorStates = _firstPartOfNextGroup.second.getArray(); + for (size_t i = 0; i < numAccumulators; i++) { + _currentAccumulators[i]->process(accumulatorStates[i], true); + } + } + } + + if (!_sorterIterator->more()) { + _sorterIterator.reset(); + break; + } + + _firstPartOfNextGroup = _sorterIterator->next(); + } + + return makeDocument(currentId, _currentAccumulators, pExpCtx->needsMerge); +} + +DocumentSource::GetNextResult DocumentSourceGroupBase::getNextStandard() { + // Not spilled, and not streaming. + if (_groupsIterator == _groups->end()) + return GetNextResult::makeEOF(); + + Document out = + makeDocument(_groupsIterator->first, _groupsIterator->second, pExpCtx->needsMerge); + ++_groupsIterator; + return out; +} + +void DocumentSourceGroupBase::doDispose() { + resetReadyGroups(); +} + +intrusive_ptr<DocumentSource> DocumentSourceGroupBase::optimize() { + // Optimizing a 'DocumentSourceGroupBase' might modify its expressions to become incompatible + // with SBE. We temporarily highjack the context's 'sbeCompatible' flag to communicate the + // situation back to the 'DocumentSourceGroupBase'. Notice, that while a particular + // 'DocumentSourceGroupBase' might become incompatible with SBE, other groups in the pipeline + // and the collection access could be still eligible for lowering to SBE, thus we must reset the + // context's 'sbeCompatible' flag back to its original value at the end of the 'optimize()' + // call. + // + // TODO SERVER-XXXXX: replace this hack with a proper per-stage tracking of SBE compatibility. + auto expCtx = _idExpressions[0]->getExpressionContext(); + auto orgSbeCompatible = expCtx->sbeCompatible; + expCtx->sbeCompatible = true; + + // TODO: If all _idExpressions are ExpressionConstants after optimization, then we know there + // will be only one group. We should take advantage of that to avoid going through the hash + // table. + for (size_t i = 0; i < _idExpressions.size(); i++) { + _idExpressions[i] = _idExpressions[i]->optimize(); + } + + for (auto&& accumulatedField : _accumulatedFields) { + accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize(); + accumulatedField.expr.argument = accumulatedField.expr.argument->optimize(); + } + + _sbeCompatible = _sbeCompatible && expCtx->sbeCompatible; + expCtx->sbeCompatible = orgSbeCompatible; + + return this; +} + +DepsTracker::State DocumentSourceGroupBase::getDependencies(DepsTracker* deps) const { + // add the _id + for (size_t i = 0; i < _idExpressions.size(); i++) { + expression::addDependencies(_idExpressions[i].get(), deps); + } + + // add the rest + for (auto&& accumulatedField : _accumulatedFields) { + expression::addDependencies(accumulatedField.expr.argument.get(), deps); + // Don't add initializer, because it doesn't refer to docs from the input stream. + } + + return DepsTracker::State::EXHAUSTIVE_ALL; +} + +void DocumentSourceGroupBase::addVariableRefs(std::set<Variables::Id>* refs) const { + for (const auto& idExpr : _idExpressions) { + expression::addVariableRefs(idExpr.get(), refs); + } + + for (auto&& accumulatedField : _accumulatedFields) { + expression::addVariableRefs(accumulatedField.expr.argument.get(), refs); + } +} + +DocumentSource::GetModPathsReturn DocumentSourceGroupBase::getModifiedPaths() const { + // We preserve none of the fields, but any fields referenced as part of the group key are + // logically just renamed. + StringMap<std::string> renames; + for (std::size_t i = 0; i < _idExpressions.size(); ++i) { + auto idExp = _idExpressions[i]; + auto pathToPutResultOfExpression = + _idFieldNames.empty() ? "_id" : "_id." + _idFieldNames[i]; + auto computedPaths = idExp->getComputedPaths(pathToPutResultOfExpression); + for (auto&& rename : computedPaths.renames) { + renames[rename.first] = rename.second; + } + } + + return {DocumentSource::GetModPathsReturn::Type::kAllExcept, + OrderedPathSet{}, // No fields are preserved. + std::move(renames)}; +} + +StringMap<boost::intrusive_ptr<Expression>> DocumentSourceGroupBase::getIdFields() const { + if (_idFieldNames.empty()) { + invariant(_idExpressions.size() == 1); + return {{"_id", _idExpressions[0]}}; + } else { + invariant(_idFieldNames.size() == _idExpressions.size()); + StringMap<boost::intrusive_ptr<Expression>> result; + for (std::size_t i = 0; i < _idFieldNames.size(); ++i) { + result["_id." + _idFieldNames[i]] = _idExpressions[i]; + } + return result; + } +} + +std::vector<boost::intrusive_ptr<Expression>>& DocumentSourceGroupBase::getMutableIdFields() { + tassert(7020503, "Can't mutate _id fields after initialization", !_executionStarted); + return _idExpressions; +} + +const std::vector<AccumulationStatement>& DocumentSourceGroupBase::getAccumulatedFields() const { + return _accumulatedFields; +} + +std::vector<AccumulationStatement>& DocumentSourceGroupBase::getMutableAccumulatedFields() { + tassert(7020504, "Can't mutate accumulated fields after initialization", !_executionStarted); + return _accumulatedFields; +} + +DocumentSourceGroupBase::DocumentSourceGroupBase(StringData stageName, + const intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes) + : DocumentSource(stageName, expCtx), + _doingMerge(false), + _memoryTracker{expCtx->allowDiskUse && !expCtx->inMongos, + maxMemoryUsageBytes + ? *maxMemoryUsageBytes + : static_cast<size_t>(internalDocumentSourceGroupMaxMemoryBytes.load())}, + _executionStarted(false), + _groups(expCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), + _spilled(false), + _sbeCompatible(false) {} + +void DocumentSourceGroupBase::addAccumulator(AccumulationStatement accumulationStatement) { + _accumulatedFields.push_back(accumulationStatement); + _memoryTracker.set(accumulationStatement.fieldName, 0); +} + +namespace { + +intrusive_ptr<Expression> parseIdExpression(const intrusive_ptr<ExpressionContext>& expCtx, + BSONElement groupField, + const VariablesParseState& vps) { + if (groupField.type() == Object) { + // {_id: {}} is treated as grouping on a constant, not an expression + if (groupField.Obj().isEmpty()) { + return ExpressionConstant::create(expCtx.get(), Value(groupField)); + } + + const BSONObj idKeyObj = groupField.Obj(); + if (idKeyObj.firstElementFieldName()[0] == '$') { + // grouping on a $op expression + return Expression::parseObject(expCtx.get(), idKeyObj, vps); + } else { + for (auto&& field : idKeyObj) { + uassert(17390, + "$group does not support inclusion-style expressions", + !field.isNumber() && field.type() != Bool); + } + return ExpressionObject::parse(expCtx.get(), idKeyObj, vps); + } + } else { + return Expression::parseOperand(expCtx.get(), groupField, vps); + } +} + +} // namespace + +void DocumentSourceGroupBase::setIdExpression(const boost::intrusive_ptr<Expression> idExpression) { + if (auto object = dynamic_cast<ExpressionObject*>(idExpression.get())) { + auto& childExpressions = object->getChildExpressions(); + invariant(!childExpressions.empty()); // We expect to have converted an empty object into a + // constant expression. + + // grouping on an "artificial" object. Rather than create the object for each input + // in initialize(), instead group on the output of the raw expressions. The artificial + // object will be created at the end in makeDocument() while outputting results. + for (auto&& childExpPair : childExpressions) { + _idFieldNames.push_back(childExpPair.first); + _idExpressions.push_back(childExpPair.second); + } + } else { + _idExpressions.push_back(idExpression); + } +} + +boost::intrusive_ptr<Expression> DocumentSourceGroupBase::getIdExpression() const { + // _idFieldNames is empty and _idExpressions has one element when the _id expression is not an + // object expression. + if (_idFieldNames.empty() && _idExpressions.size() == 1) { + return _idExpressions[0]; + } + + tassert(6586300, + "Field and its expression must be always paired in ExpressionObject", + _idFieldNames.size() > 0 && _idFieldNames.size() == _idExpressions.size()); + + // Each expression in '_idExpressions' may have been optimized and so, compose the object _id + // expression out of the optimized expressions. + std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fieldsAndExprs; + for (size_t i = 0; i < _idExpressions.size(); ++i) { + fieldsAndExprs.emplace_back(_idFieldNames[i], _idExpressions[i]); + } + + return ExpressionObject::create(_idExpressions[0]->getExpressionContext(), + std::move(fieldsAndExprs)); +} + +void DocumentSourceGroupBase::initializeFromBson(BSONElement elem) { + uassert(15947, "a group's fields must be specified in an object", elem.type() == Object); + + BSONObj groupObj(elem.Obj()); + BSONObjIterator groupIterator(groupObj); + VariablesParseState vps = pExpCtx->variablesParseState; + pExpCtx->sbeGroupCompatible = true; + while (groupIterator.more()) { + BSONElement groupField(groupIterator.next()); + StringData pFieldName = groupField.fieldNameStringData(); + if (pFieldName == "_id") { + uassert(15948, "a group's _id may only be specified once", _idExpressions.empty()); + setIdExpression(parseIdExpression(pExpCtx, groupField, vps)); + invariant(!_idExpressions.empty()); + } else if (pFieldName == "$doingMerge") { + massert(17030, "$doingMerge should be true if present", groupField.Bool()); + + setDoingMerge(true); + } else if (isSpecFieldReserved(pFieldName)) { + // No-op: field is used by the derived class. + } else { + // Any other field will be treated as an accumulator specification. + addAccumulator( + AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), groupField, vps)); + } + } + _sbeCompatible = pExpCtx->sbeGroupCompatible && pExpCtx->sbeCompatible; + + uassert(15955, "a group specification must include an _id", !_idExpressions.empty()); +} + +namespace { + +using GroupsMap = DocumentSourceGroupBase::GroupsMap; + +class SorterComparator { +public: + SorterComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {} + + int operator()(const Value& lhs, const Value& rhs) const { + return _valueComparator.compare(lhs, rhs); + } + +private: + ValueComparator _valueComparator; +}; + +class SpillSTLComparator { +public: + SpillSTLComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {} + + bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const { + return _valueComparator.evaluate(lhs->first < rhs->first); + } + +private: + ValueComparator _valueComparator; +}; +} // namespace + +void DocumentSourceGroupBase::processDocument(const Value& id, const Document& root) { + const size_t numAccumulators = _accumulatedFields.size(); + + // Look for the _id value in the map. If it's not there, add a new entry with a blank + // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and + // looking it up in '_groups' multiple times. + const size_t oldSize = _groups->size(); + vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id]; + const bool inserted = _groups->size() != oldSize; + + vector<uint64_t> oldAccumMemUsage(numAccumulators, 0); + if (inserted) { + _memoryTracker.set(_memoryTracker.currentMemoryBytes() + id.getApproximateSize()); + + // Initialize and add the accumulators + Value expandedId = expandId(id); + Document idDoc = + expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document(); + group.reserve(numAccumulators); + for (auto&& accumulatedField : _accumulatedFields) { + auto accum = accumulatedField.makeAccumulator(); + Value initializerValue = + accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables); + accum->startNewGroup(initializerValue); + group.push_back(accum); + } + } + + /* tickle all the accumulators for the group we found */ + dassert(numAccumulators == group.size()); + + for (size_t i = 0; i < numAccumulators; i++) { + // Only process the input and update the memory footprint if the current accumulator + // needs more input. + if (group[i]->needsInput()) { + const auto prevMemUsage = inserted ? 0 : group[i]->getMemUsage(); + group[i]->process( + _accumulatedFields[i].expr.argument->evaluate(root, &pExpCtx->variables), + _doingMerge); + _memoryTracker.update(_accumulatedFields[i].fieldName, + group[i]->getMemUsage() - prevMemUsage); + } + } + + if (kDebugBuild && !pExpCtx->opCtx->readOnly()) { + // In debug mode, spill every time we have a duplicate id to stress merge logic. + if (!inserted && // is a dup + !pExpCtx->inMongos && // can't spill to disk in mongos + !_memoryTracker._allowDiskUse && // don't change behavior when testing external sort + _sortedFiles.size() < 20) { // don't open too many FDs + spill(); + } + } +} + +void DocumentSourceGroupBase::readyGroups() { + _spilled = !_sortedFiles.empty(); + if (_spilled) { + if (!_groups->empty()) { + spill(); + } + + _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); + + _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge( + _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator()))); + + // prepare current to accumulate data + _currentAccumulators.reserve(_accumulatedFields.size()); + for (auto&& accumulatedField : _accumulatedFields) { + _currentAccumulators.push_back(accumulatedField.makeAccumulator()); + } + + verify(_sorterIterator->more()); // we put data in, we should get something out. + _firstPartOfNextGroup = _sorterIterator->next(); + } else { + // start the group iterator + _groupsIterator = _groups->begin(); + } +} + +void DocumentSourceGroupBase::resetReadyGroups() { + // Free our resources. + _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); + _memoryTracker.resetCurrent(); + _sorterIterator.reset(); + _sortedFiles.clear(); + + // Make us look done. + _groupsIterator = _groups->end(); +} + +void DocumentSourceGroupBase::spill() { + _stats.spills++; + + vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting + ptrs.reserve(_groups->size()); + for (GroupsMap::const_iterator it = _groups->begin(), end = _groups->end(); it != end; ++it) { + ptrs.push_back(&*it); + } + + stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator())); + + // Initialize '_file' in a lazy manner only when it is needed. + if (!_file) { + _file = + std::make_shared<Sorter<Value, Value>::File>(pExpCtx->tempDir + "/" + nextFileName()); + } + SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file); + switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i. + case 0: // no values, essentially a distinct + for (size_t i = 0; i < ptrs.size(); i++) { + writer.addAlreadySorted(ptrs[i]->first, Value()); + } + break; + + case 1: // just one value, use optimized serialization as single Value + for (size_t i = 0; i < ptrs.size(); i++) { + writer.addAlreadySorted(ptrs[i]->first, + ptrs[i]->second[0]->getValue(/*toBeMerged=*/true)); + } + break; + + default: // multiple values, serialize as array-typed Value + for (size_t i = 0; i < ptrs.size(); i++) { + vector<Value> accums; + for (size_t j = 0; j < ptrs[i]->second.size(); j++) { + accums.push_back(ptrs[i]->second[j]->getValue(/*toBeMerged=*/true)); + } + writer.addAlreadySorted(ptrs[i]->first, Value(std::move(accums))); + } + break; + } + + auto& metricsCollector = ResourceConsumption::MetricsCollector::get(pExpCtx->opCtx); + metricsCollector.incrementKeysSorted(ptrs.size()); + metricsCollector.incrementSorterSpills(1); + + _groups->clear(); + // Zero out the current per-accumulation statement memory consumption, as the memory has been + // freed by spilling. + for (const auto& accum : _accumulatedFields) { + _memoryTracker.set(accum.fieldName, 0); + } + + _sortedFiles.emplace_back(writer.done()); +} + +Value DocumentSourceGroupBase::computeId(const Document& root) { + // If only one expression, return result directly + if (_idExpressions.size() == 1) { + Value retValue = _idExpressions[0]->evaluate(root, &pExpCtx->variables); + return retValue.missing() ? Value(BSONNULL) : std::move(retValue); + } + + // Multiple expressions get results wrapped in a vector + vector<Value> vals; + vals.reserve(_idExpressions.size()); + for (size_t i = 0; i < _idExpressions.size(); i++) { + vals.push_back(_idExpressions[i]->evaluate(root, &pExpCtx->variables)); + } + return Value(std::move(vals)); +} + +Value DocumentSourceGroupBase::expandId(const Value& val) { + // _id doesn't get wrapped in a document + if (_idFieldNames.empty()) + return val; + + // _id is a single-field document containing val + if (_idFieldNames.size() == 1) + return Value(DOC(_idFieldNames[0] << val)); + + // _id is a multi-field document containing the elements of val + const vector<Value>& vals = val.getArray(); + invariant(_idFieldNames.size() == vals.size()); + MutableDocument md(vals.size()); + for (size_t i = 0; i < vals.size(); i++) { + md[_idFieldNames[i]] = vals[i]; + } + return md.freezeToValue(); +} + +Document DocumentSourceGroupBase::makeDocument(const Value& id, + const Accumulators& accums, + bool mergeableOutput) { + const size_t n = _accumulatedFields.size(); + MutableDocument out(1 + n); + + /* add the _id field */ + out.addField("_id", expandId(id)); + + /* add the rest of the fields */ + for (size_t i = 0; i < n; ++i) { + Value val = accums[i]->getValue(mergeableOutput); + if (val.missing()) { + // we return null in this case so return objects are predictable + out.addField(_accumulatedFields[i].fieldName, Value(BSONNULL)); + } else { + out.addField(_accumulatedFields[i].fieldName, std::move(val)); + } + } + + _stats.totalOutputDataSizeBytes += out.getApproximateSize(); + return out.freeze(); +} + +bool DocumentSourceGroupBase::pathIncludedInGroupKeys(const std::string& dottedPath) const { + return std::any_of( + _idExpressions.begin(), _idExpressions.end(), [&dottedPath](const auto& exp) { + if (auto fieldExp = dynamic_cast<ExpressionFieldPath*>(exp.get())) { + if (fieldExp->representsPath(dottedPath)) { + return true; + } + } + return false; + }); +} + +bool DocumentSourceGroupBase::canRunInParallelBeforeWriteStage( + const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const { + if (_doingMerge) { + return true; // This is fine. + } + + // Certain $group stages are allowed to execute on each exchange consumer. In order to + // guarantee each consumer will only group together data from its own shard, the $group must + // group on a superset of the shard key. + for (auto&& currentPathOfShardKey : nameOfShardKeyFieldsUponEntryToStage) { + if (!pathIncludedInGroupKeys(currentPathOfShardKey)) { + // This requires an exact path match, but as a future optimization certain path + // prefixes should be okay. For example, if the shard key path is "a.b", and we're + // grouping by "a", then each group of "a" is strictly more specific than "a.b", so + // we can deduce that grouping by "a" will not need to group together documents + // across different values of the shard key field "a.b", and thus as long as any + // other shard key fields are similarly preserved will not need to consume a merged + // stream to perform the group. + return false; + } + } + return true; +} + +std::unique_ptr<GroupFromFirstDocumentTransformation> +DocumentSourceGroupBase::rewriteGroupAsTransformOnFirstDocument() const { + if (_idExpressions.size() != 1) { + // This transformation is only intended for $group stages that group on a single field. + return nullptr; + } + + auto fieldPathExpr = dynamic_cast<ExpressionFieldPath*>(_idExpressions.front().get()); + if (!fieldPathExpr || fieldPathExpr->isVariableReference()) { + return nullptr; + } + + const auto fieldPath = fieldPathExpr->getFieldPath(); + if (fieldPath.getPathLength() == 1) { + // The path is $$CURRENT or $$ROOT. This isn't really a sensible value to group by (since + // each document has a unique _id, it will just return the entire collection). We only + // apply the rewrite when grouping by a single field, so we cannot apply it in this case, + // where we are grouping by the entire document. + tassert(5943200, + "Optimization attempted on group by always-dissimilar system variable", + fieldPath.getFieldName(0) == "CURRENT" || fieldPath.getFieldName(0) == "ROOT"); + return nullptr; + } + + const auto groupId = fieldPath.tail().fullPath(); + + // We can't do this transformation if there are any non-$first accumulators. + for (auto&& accumulator : _accumulatedFields) { + if (AccumulatorDocumentsNeeded::kFirstDocument != + accumulator.makeAccumulator()->documentsNeeded()) { + return nullptr; + } + } + + std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fields; + + boost::intrusive_ptr<Expression> idField; + // The _id field can be specified either as a fieldpath (ex. _id: "$a") or as a singleton + // object (ex. _id: {v: "$a"}). + if (_idFieldNames.empty()) { + idField = ExpressionFieldPath::deprecatedCreate(pExpCtx.get(), groupId); + } else { + invariant(_idFieldNames.size() == 1); + idField = ExpressionObject::create(pExpCtx.get(), + {{_idFieldNames.front(), _idExpressions.front()}}); + } + fields.push_back(std::make_pair("_id", idField)); + + for (auto&& accumulator : _accumulatedFields) { + fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument)); + + // Since we don't attempt this transformation for non-$first accumulators, + // the initializer should always be trivial. + } + + return GroupFromFirstDocumentTransformation::create( + pExpCtx, groupId, getSourceName(), std::move(fields)); +} + +size_t DocumentSourceGroupBase::getMaxMemoryUsageBytes() const { + return _memoryTracker._maxAllowedMemoryUsageBytes; +} + +boost::optional<DocumentSource::DistributedPlanLogic> +DocumentSourceGroupBase::distributedPlanLogic() { + VariablesParseState vps = pExpCtx->variablesParseState; + /* the merger will use the same grouping key */ + auto mergerGroupByExpression = ExpressionFieldPath::parse(pExpCtx.get(), "$$ROOT._id", vps); + + std::vector<AccumulationStatement> mergerAccumulators; + mergerAccumulators.reserve(_accumulatedFields.size()); + for (auto&& accumulatedField : _accumulatedFields) { + // The merger's output field names will be the same, as will the accumulator factories. + // However, for some accumulators, the expression to be accumulated will be different. The + // original accumulator may be collecting an expression based on a field expression or + // constant. Here, we accumulate the output of the same name from the prior group. + auto copiedAccumulatedField = accumulatedField; + copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse( + pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps); + mergerAccumulators.emplace_back(std::move(copiedAccumulatedField)); + } + + // When merging, we always use generic hash based algorithm. + boost::intrusive_ptr<DocumentSourceGroup> mergingGroup = DocumentSourceGroup::create( + pExpCtx, std::move(mergerGroupByExpression), std::move(mergerAccumulators)); + mergingGroup->setDoingMerge(true); + // {shardsStage, mergingStage, sortPattern} + return DistributedPlanLogic{this, mergingGroup, boost::none}; +} + +} // namespace mongo + +#include "mongo/db/sorter/sorter.cpp" +// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_group_base.h b/src/mongo/db/pipeline/document_source_group_base.h new file mode 100644 index 00000000000..9cbd71e9637 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_group_base.h @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <utility> + +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/group_from_first_document_transformation.h" +#include "mongo/db/pipeline/memory_usage_tracker.h" +#include "mongo/db/sorter/sorter.h" + +namespace mongo { + +/** + * This class represents a $group stage generically - could be a streaming or hash based group. + * + * It contains some common execution code between the two algorithms, such as: + * - Handling spilling to disk. + * - Computing the group key + * - Accumulating values and populating output documents. + */ +class DocumentSourceGroupBase : public DocumentSource { +public: + using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>; + using GroupsMap = ValueUnorderedMap<Accumulators>; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + boost::intrusive_ptr<DocumentSource> optimize() final; + DepsTracker::State getDependencies(DepsTracker* deps) const final; + void addVariableRefs(std::set<Variables::Id>* refs) const final; + GetModPathsReturn getModifiedPaths() const final; + StringMap<boost::intrusive_ptr<Expression>> getIdFields() const; + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final; + + /** + * Can be used to change or swap out individual _id fields, but should not be used + * once execution has begun. + */ + std::vector<boost::intrusive_ptr<Expression>>& getMutableIdFields(); + const std::vector<AccumulationStatement>& getAccumulatedFields() const; + + /** + * Can be used to change or swap out individual accumulated fields, but should not be used + * once execution has begun. + */ + std::vector<AccumulationStatement>& getMutableAccumulatedFields(); + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + StageConstraints constraints(StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kAllowed); + constraints.canSwapWithMatch = true; + return constraints; + } + + /** + * Add an accumulator, which will become a field in each Document that results from grouping. + */ + void addAccumulator(AccumulationStatement accumulationStatement); + + /** + * Sets the expression to use to determine the group id of each document. + */ + void setIdExpression(boost::intrusive_ptr<Expression> idExpression); + + /** + * Returns the expression to use to determine the group id of each document. + */ + boost::intrusive_ptr<Expression> getIdExpression() const; + + /** + * Returns true if this $group stage represents a 'global' $group which is merging together + * results from earlier partial groups. + */ + bool doingMerge() const { + return _doingMerge; + } + + /** + * Tell this source if it is doing a merge from shards. Defaults to false. + */ + void setDoingMerge(bool doingMerge) { + _doingMerge = doingMerge; + } + + /** + * Returns true if this $group stage used disk during execution and false otherwise. + */ + bool usedDisk() final { + return _stats.spills > 0; + } + + const SpecificStats* getSpecificStats() const final { + return &_stats; + } + + bool canRunInParallelBeforeWriteStage( + const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const final; + + /** + * When possible, creates a document transformer that transforms the first document in a group + * into one of the output documents of the $group stage. This is possible when we are grouping + * on a single field and all accumulators are $first (or there are no accumluators). + * + * It is sometimes possible to use a DISTINCT_SCAN to scan the first document of each group, + * in which case this transformation can replace the actual $group stage in the pipeline + * (SERVER-9507). + */ + std::unique_ptr<GroupFromFirstDocumentTransformation> rewriteGroupAsTransformOnFirstDocument() + const; + + /** + * Returns maximum allowed memory footprint. + */ + size_t getMaxMemoryUsageBytes() const; + + // True if this $group can be pushed down to SBE. + bool sbeCompatible() const { + return _sbeCompatible; + } + +protected: + DocumentSourceGroupBase(StringData stageName, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes = boost::none); + + void initializeFromBson(BSONElement elem); + virtual bool isSpecFieldReserved(StringData fieldName) = 0; + + void doDispose() final; + + /** + * Cleans up any pending memory usage. Throws error, if memory usage is above + * 'maxMemoryUsageBytes' and cannot spill to disk. + * + * Returns true, if the caller should spill to disk, false otherwise. + */ + bool shouldSpillWithAttemptToSaveMemory(); + + /** + * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group + * does not exhaust the previous stage before returning, and thus does not maintain as large a + * store of documents at any one time, only an unsorted group can spill to disk. + */ + void spill(); + + /** + * Computes the internal representation of the group key. + */ + Value computeId(const Document& root); + + void processDocument(const Value& id, const Document& root); + + void readyGroups(); + void resetReadyGroups(); + + GetNextResult getNextReadyGroup(); + + void setExecutionStarted() { + _executionStarted = true; + } + + virtual void serializeAdditionalFields( + MutableDocument& out, boost::optional<ExplainOptions::Verbosity> explain) const {}; + + // If the expression for the '_id' field represents a non-empty object, we track its fields' + // names in '_idFieldNames'. + std::vector<std::string> _idFieldNames; + // Expressions for the individual fields when '_id' produces a document in the order of + // '_idFieldNames' or the whole expression otherwise. + std::vector<boost::intrusive_ptr<Expression>> _idExpressions; + +private: + GetNextResult getNextSpilled(); + GetNextResult getNextStandard(); + + /** + * If we ran out of memory, finish all the pending operations so that some memory + * can be freed. + */ + void freeMemory(); + + Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); + + /** + * Converts the internal representation of the group key to the _id shape specified by the + * user. + */ + Value expandId(const Value& val); + + /** + * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'. + */ + bool pathIncludedInGroupKeys(const std::string& dottedPath) const; + + std::vector<AccumulationStatement> _accumulatedFields; + + bool _doingMerge; + + MemoryUsageTracker _memoryTracker; + + GroupStats _stats; + + /** + * This flag should be set during first execution of getNext() to assert that non-const methods + * that expose internal structures are not called during runtime. + */ + bool _executionStarted; + + // We use boost::optional to defer initialization until the ExpressionContext containing the + // correct comparator is injected, since the groups must be built using the comparator's + // definition of equality. + boost::optional<GroupsMap> _groups; + + std::shared_ptr<Sorter<Value, Value>::File> _file; + std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles; + bool _spilled; + + // Only used when '_spilled' is false. + GroupsMap::iterator _groupsIterator; + + // Only used when '_spilled' is true. + std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; + + std::pair<Value, Value> _firstPartOfNextGroup; + Accumulators _currentAccumulators; + + bool _sbeCompatible; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 8867356bc89..ef29cdb2795 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -48,6 +48,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_streaming_group.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/query/query_test_service_context.h" @@ -251,32 +252,65 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { return arr[0].getDocument().toBson(); } +enum class GroupStageType { Default, Streaming }; + class Base : public ServiceContextTest { public: - Base() + Base(GroupStageType groupStageType = GroupStageType::Default) : _opCtx(makeOperationContext()), _ctx(new ExpressionContextForTest(_opCtx.get(), AggregateCommandRequest(NamespaceString(ns), {}))), - _tempDir("DocumentSourceGroupTest") {} + _tempDir("DocumentSourceGroupTest"), + _groupStageType(groupStageType) {} protected: + StringData getStageName() const { + switch (_groupStageType) { + case GroupStageType::Default: + return DocumentSourceGroup::kStageName; + case GroupStageType::Streaming: + return DocumentSourceStreamingGroup::kStageName; + default: + MONGO_UNREACHABLE; + } + } + + virtual boost::optional<size_t> getMaxMemoryUsageBytes() { + return boost::none; + } + + intrusive_ptr<DocumentSource> createFromBson( + BSONElement specElement, intrusive_ptr<ExpressionContext> expressionContext) { + switch (_groupStageType) { + case GroupStageType::Default: + return DocumentSourceGroup::createFromBsonWithMaxMemoryUsage( + std::move(specElement), expressionContext, getMaxMemoryUsageBytes()); + case GroupStageType::Streaming: + return DocumentSourceStreamingGroup::createFromBsonWithMaxMemoryUsage( + std::move(specElement), expressionContext, getMaxMemoryUsageBytes()); + default: + MONGO_UNREACHABLE; + } + } + void createGroup(const BSONObj& spec, bool inShard = false, bool inMongos = false) { - BSONObj namedSpec = BSON("$group" << spec); + BSONObj namedSpec = BSON(getStageName() << spec); BSONElement specElement = namedSpec.firstElement(); intrusive_ptr<ExpressionContextForTest> expressionContext = new ExpressionContextForTest( _opCtx.get(), AggregateCommandRequest(NamespaceString(ns), {})); + expressionContext->allowDiskUse = true; // For $group, 'inShard' implies 'fromMongos' and 'needsMerge'. expressionContext->fromMongos = expressionContext->needsMerge = inShard; expressionContext->inMongos = inMongos; // Won't spill to disk properly if it needs to. expressionContext->tempDir = _tempDir.path(); - _group = DocumentSourceGroup::createFromBson(specElement, expressionContext); + _group = createFromBson(specElement, expressionContext); assertRoundTrips(_group, expressionContext); } - DocumentSourceGroup* group() { - return static_cast<DocumentSourceGroup*>(_group.get()); + DocumentSourceGroupBase* group() { + return static_cast<DocumentSourceGroupBase*>(_group.get()); } /** Assert that iterator state accessors consistently report the source is exhausted. */ void assertEOF(const intrusive_ptr<DocumentSource>& source) const { @@ -298,8 +332,7 @@ private: // $const operators may be introduced in the first serialization. BSONObj spec = toBson(group); BSONElement specElement = spec.firstElement(); - intrusive_ptr<DocumentSource> generated = - DocumentSourceGroup::createFromBson(specElement, expCtx); + intrusive_ptr<DocumentSource> generated = createFromBson(specElement, expCtx); ASSERT_BSONOBJ_EQ(spec, toBson(generated)); } std::unique_ptr<QueryTestServiceContext> _queryServiceContext; @@ -307,6 +340,7 @@ private: intrusive_ptr<ExpressionContextForTest> _ctx; intrusive_ptr<DocumentSource> _group; TempDir _tempDir; + GroupStageType _groupStageType; }; class ParseErrorBase : public Base { @@ -354,10 +388,9 @@ class IdConstantBase : public ExpressionBase { class NonObject : public Base { public: void _doTest() final { - BSONObj spec = BSON("$group" - << "foo"); + BSONObj spec = BSON(getStageName() << "foo"); BSONElement specElement = spec.firstElement(); - ASSERT_THROWS(DocumentSourceGroup::createFromBson(specElement, ctx()), AssertionException); + ASSERT_THROWS(createFromBson(specElement, ctx()), AssertionException); } }; @@ -556,8 +589,10 @@ typedef map<Value, Document, ValueCmp> IdMap; class CheckResultsBase : public Base { public: + CheckResultsBase(GroupStageType groupStageType = GroupStageType::Default) + : Base(groupStageType) {} virtual ~CheckResultsBase() {} - void _doTest() { + void _doTest() override { runSharded(false); runSharded(true); } @@ -570,7 +605,7 @@ public: if (sharded) { sink = createMerger(); // Serialize and re-parse the shard stage. - createGroup(toBson(group())["$group"].Obj(), true); + createGroup(toBson(group())[group()->getSourceName()].Obj(), true); group()->setSource(source.get()); sink->setSource(group()); } @@ -870,6 +905,298 @@ public: } }; +class StreamingSimple final : public CheckResultsBase { +public: + StreamingSimple() : CheckResultsBase(GroupStageType::Streaming) {} + +private: + deque<DocumentSource::GetNextResult> inputData() final { + return {Document(BSON("a" << 1 << "b" << 1)), + Document(BSON("a" << 1 << "b" << 2)), + Document(BSON("a" << 2 << "b" << 3)), + Document(BSON("a" << 2 << "b" << 1))}; + } + BSONObj groupSpec() final { + return BSON("_id" + << "$a" + << "sum" + << BSON("$sum" + << "$b") + << "$monotonicIdFields" << BSON_ARRAY("_id")); + } + string expectedResultSetString() final { + return "[{_id:1,sum:3},{_id:2,sum:4}]"; + } +}; + +constexpr size_t kBigStringSize = 1024; +const std::string kBigString(kBigStringSize, 'a'); + +class CheckResultsAndSpills : public CheckResultsBase { +public: + CheckResultsAndSpills(GroupStageType groupStageType, uint64_t expectedSpills) + : CheckResultsBase(groupStageType), _expectedSpills(expectedSpills) {} + + void _doTest() final { + for (int sharded = 0; sharded < 2; ++sharded) { + runSharded(sharded); + const auto* groupStats = static_cast<const GroupStats*>(group()->getSpecificStats()); + ASSERT_EQ(groupStats->spills, _expectedSpills); + } + } + +private: + uint64_t _expectedSpills; +}; + +template <GroupStageType groupStageType, uint64_t expectedSpills> +class StreamingSpillTest : public CheckResultsAndSpills { +public: + StreamingSpillTest() : CheckResultsAndSpills(groupStageType, expectedSpills) {} + +private: + static constexpr int kCount = 11; + + deque<DocumentSource::GetNextResult> inputData() final { + deque<DocumentSource::GetNextResult> queue; + for (int i = 0; i < kCount; ++i) { + queue.emplace_back(Document(BSON("a" << i << "b" << kBigString))); + } + return queue; + } + + BSONObj groupSpec() final { + BSONObjBuilder builder; + builder.append("_id", "$a"); + BSONObjBuilder accum(builder.subobjStart("big_array")); + accum.append("$push", "$b"); + accum.done(); + + if constexpr (groupStageType == GroupStageType::Streaming) { + BSONArrayBuilder sub(builder.subarrayStart("$monotonicIdFields")); + sub.append("_id").done(); + } + return builder.done(); + } + + boost::optional<size_t> getMaxMemoryUsageBytes() final { + return 10 * kBigStringSize; + } + + BSONObj expectedResultSet() final { + BSONArrayBuilder result; + for (int i = 0; i < kCount; ++i) { + result.append(BSON("_id" << i << "big_array" << BSON_ARRAY(kBigString))); + } + return result.arr(); + } +}; + +class WithoutStreamingSpills final + : public StreamingSpillTest<GroupStageType::Default, 2 /*expectedSpills*/> {}; +class StreamingDoesNotSpill final + : public StreamingSpillTest<GroupStageType::Streaming, 0 /*expectedSpills*/> {}; + +class StreamingCanSpill final : public CheckResultsAndSpills { +public: + StreamingCanSpill() : CheckResultsAndSpills(GroupStageType::Streaming, 2 /*expectedSpills*/) {} + +private: + static constexpr int kCount = 11; + + deque<DocumentSource::GetNextResult> inputData() final { + deque<DocumentSource::GetNextResult> queue; + for (int i = 0; i < kCount; ++i) { + queue.emplace_back(Document(BSON("x" << 0 << "y" << i << "b" << kBigString))); + } + return queue; + } + + BSONObj groupSpec() final { + auto id = BSON("x" + << "$x" + << "y" + << "$y"); + return BSON("_id" << id << "big_array" + << BSON("$push" + << "$b") + << "$monotonicIdFields" << BSON_ARRAY("x")); + } + + boost::optional<size_t> getMaxMemoryUsageBytes() final { + return 10 * kBigStringSize; + } + + BSONObj expectedResultSet() final { + BSONArrayBuilder result; + for (int i = 0; i < kCount; ++i) { + auto id = BSON("x" << 0 << "y" << i); + result.append(BSON("_id" << id << "big_array" << BSON_ARRAY(kBigString))); + } + return result.done(); + } +}; + +class StreamingAlternatingSpillAndNoSpillBatches : public CheckResultsAndSpills { +public: + StreamingAlternatingSpillAndNoSpillBatches() + : CheckResultsAndSpills(GroupStageType::Streaming, 3 /*expectedSpills*/) {} + +private: + static constexpr int kCount = 12; + + deque<DocumentSource::GetNextResult> inputData() final { + deque<DocumentSource::GetNextResult> queue; + for (int i = 0; i < kCount; ++i) { + // For groups with i % 3 == 0 and i % 3 == 1 there should be no spilling, but groups + // with i % 3 == 2 should spill. + for (int j = 0; j < (i % 3) + 1; ++j) { + queue.emplace_back(Document(BSON("a" << i << "b" << kBigString))); + } + } + return queue; + } + + BSONObj groupSpec() final { + return BSON("_id" + << "$a" + << "big_array" + << BSON("$push" + << "$b") + << "$monotonicIdFields" << BSON_ARRAY("_id")); + } + + boost::optional<size_t> getMaxMemoryUsageBytes() final { + return (25 * kBigStringSize) / 10; + } + + BSONObj expectedResultSet() final { + BSONArrayBuilder result; + for (int i = 0; i < kCount; ++i) { + BSONArrayBuilder bigArray; + for (int j = 0; j < (i % 3) + 1; ++j) { + bigArray.append(kBigString); + } + result.append(BSON("_id" << i << "big_array" << bigArray.arr())); + } + return result.done(); + } +}; + +class StreamingComplex final : public CheckResultsBase { +public: + StreamingComplex() : CheckResultsBase(GroupStageType::Streaming) {} + +private: + static constexpr int kCount = 3; + + deque<DocumentSource::GetNextResult> inputData() final { + deque<DocumentSource::GetNextResult> queue; + for (int i = 0; i < kCount; ++i) { + for (int j = 0; j < kCount; ++j) { + for (int k = 0; k < kCount; ++k) { + queue.emplace_back(Document(BSON("x" << i << "y" << j << "z" << k))); + } + } + } + return queue; + } + + BSONObj groupSpec() final { + BSONObj id = BSON("x" + << "$x" + << "y" + << "$y"); + return BSON("_id" << id << "sum" + << BSON("$sum" + << "$z") + << "$monotonicIdFields" << BSON_ARRAY("x")); + } + + boost::optional<size_t> getMaxMemoryUsageBytes() final { + return 10 * kBigStringSize; + } + + BSONObj expectedResultSet() final { + BSONArrayBuilder result; + for (int i = 0; i < kCount; ++i) { + for (int j = 0; j < kCount; ++j) { + result.append(BSON("_id" << BSON("x" << i << "y" << j) << "sum" + << (kCount * (kCount - 1)) / 2)); + } + } + return result.arr(); + } +}; + +class StreamingMultipleMonotonicFields final : public CheckResultsBase { +public: + StreamingMultipleMonotonicFields() : CheckResultsBase(GroupStageType::Streaming) {} + +private: + static constexpr int kCount = 6; + deque<DocumentSource::GetNextResult> inputData() final { + deque<DocumentSource::GetNextResult> queue; + generateInputOutput([&queue](int x, int y) { + for (int i = 0; i < kCount; ++i) { + queue.emplace_back(Document(BSON("x" << x << "y" << y << "z" << i))); + } + }); + return queue; + } + + BSONObj groupSpec() final { + BSONObjBuilder builder; + + BSONObjBuilder id(builder.subobjStart("_id")); + id.append("x", "$x").append("y", "$y").done(); + + BSONObjBuilder accum(builder.subobjStart("sum")); + accum.append("$sum", "$z").done(); + + BSONArrayBuilder sub(builder.subarrayStart("$monotonicIdFields")); + sub.append("x").append("y").done(); + + return builder.done(); + } + + boost::optional<size_t> getMaxMemoryUsageBytes() final { + return 10 * kBigStringSize; + } + + BSONObj expectedResultSet() final { + BSONArrayBuilder result; + generateInputOutput([&result](int x, int y) { + BSONObjBuilder builder(result.subobjStart()); + + BSONObjBuilder id(builder.subobjStart("_id")); + id.append("x", x).append("y", y).done(); + + builder.append("sum", (kCount * (kCount - 1)) / 2); + builder.done(); + }); + return result.arr(); + } + + template <typename Callback> + void generateInputOutput(const Callback& callback) { + int x = 0; + int y = 0; + for (int i = 0; i < kCount; ++i) { + callback(x, y); + int state = i % 3; + if (state == 0) { + x++; + } else if (state == 1) { + y++; + } else { + x++; + y++; + } + } + } +}; + class All : public OldStyleSuiteSpecification { public: All() : OldStyleSuiteSpecification("DocumentSourceGroupTests") {} @@ -910,6 +1237,14 @@ public: add<Dependencies>(); add<StringConstantIdAndAccumulatorExpressions>(); add<ArrayConstantAccumulatorExpression>(); + + add<StreamingSimple>(); + add<WithoutStreamingSpills>(); + add<StreamingDoesNotSpill>(); + add<StreamingCanSpill>(); + add<StreamingAlternatingSpillAndNoSpillBatches>(); + add<StreamingComplex>(); + add<StreamingMultipleMonotonicFields>(); #if 0 // Disabled tests until SERVER-23318 is implemented. add<StreamingOptimization>(); diff --git a/src/mongo/db/pipeline/document_source_streaming_group.cpp b/src/mongo/db/pipeline/document_source_streaming_group.cpp new file mode 100644 index 00000000000..2d055900c90 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_streaming_group.cpp @@ -0,0 +1,256 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/exec/document_value/value_comparator.h" +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_streaming_group.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/expression_dependencies.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/stats/resource_consumption_metrics.h" +#include "mongo/util/destructor_guard.h" + +namespace mongo { + +/* + * $_internalStreamingGroup is an internal stage that is only used in certain cases by the + * pipeline optimizer. For now it should not be used anywhere outside the MongoDB server. + */ +REGISTER_DOCUMENT_SOURCE(_internalStreamingGroup, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceStreamingGroup::createFromBson, + AllowedWithApiStrict::kAlways); + +constexpr StringData DocumentSourceStreamingGroup::kStageName; + +const char* DocumentSourceStreamingGroup::getSourceName() const { + return kStageName.rawData(); +} + +DocumentSource::GetNextResult DocumentSourceStreamingGroup::doGetNext() { + auto getReadyResult = getNextReadyGroup(); + if (!getReadyResult.isEOF()) { + return getReadyResult; + } else if (_sourceDepleted) { + dispose(); + return getReadyResult; + } + + auto prepareResult = readyNextBatch(); + if (prepareResult.isPaused()) { + return prepareResult; + } + return getNextReadyGroup(); +} + +DocumentSourceStreamingGroup::DocumentSourceStreamingGroup( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes) + : DocumentSourceGroupBase(kStageName, expCtx, maxMemoryUsageBytes), _sourceDepleted(false) {} + +boost::intrusive_ptr<DocumentSource> DocumentSourceStreamingGroup::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return createFromBsonWithMaxMemoryUsage(std::move(elem), expCtx, boost::none); +} + +boost::intrusive_ptr<DocumentSource> DocumentSourceStreamingGroup::createFromBsonWithMaxMemoryUsage( + BSONElement elem, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes) { + boost::intrusive_ptr<DocumentSourceStreamingGroup> groupStage = + new DocumentSourceStreamingGroup(expCtx, maxMemoryUsageBytes); + groupStage->initializeFromBson(elem); + + const auto& monotonicIdFieldsElem = elem.Obj().getField(kMonotonicIdFieldsSpecField); + uassert(7026702, + "streaming group must specify an array of monotonic id fields " + + kMonotonicIdFieldsSpecField, + monotonicIdFieldsElem.type() == Array); + const auto& monotonicIdFields = monotonicIdFieldsElem.Array(); + if (groupStage->_idFieldNames.empty()) { + uassert(7026703, + "if there is no explicit id fields, " + kMonotonicIdFieldsSpecField + + " must contain a single \"_id\" string", + monotonicIdFields.size() == 1 && + monotonicIdFields[0].valueStringDataSafe() == "_id"_sd); + groupStage->_monotonicExpressionIndexes.push_back(0); + } else { + groupStage->_monotonicExpressionIndexes.reserve(monotonicIdFields.size()); + for (const auto& fieldNameElem : monotonicIdFields) { + uassert(7026704, + kMonotonicIdFieldsSpecField + " elements must be strings", + fieldNameElem.type() == String); + StringData fieldName = fieldNameElem.valueStringData(); + auto it = std::find( + groupStage->_idFieldNames.begin(), groupStage->_idFieldNames.end(), fieldName); + uassert(7026705, "id field not found", it != groupStage->_idFieldNames.end()); + groupStage->_monotonicExpressionIndexes.push_back( + std::distance(groupStage->_idFieldNames.begin(), it)); + } + std::sort(groupStage->_monotonicExpressionIndexes.begin(), + groupStage->_monotonicExpressionIndexes.end()); + } + + return groupStage; +} + +void DocumentSourceStreamingGroup::serializeAdditionalFields( + MutableDocument& out, boost::optional<ExplainOptions::Verbosity> explain) const { + std::vector<Value> monotonicIdFields; + if (_idFieldNames.empty()) { + monotonicIdFields.emplace_back("_id"_sd); + } else { + for (size_t i : _monotonicExpressionIndexes) { + monotonicIdFields.emplace_back(_idFieldNames[i]); + } + } + out[kMonotonicIdFieldsSpecField] = Value(std::move(monotonicIdFields)); +} + +bool DocumentSourceStreamingGroup::isSpecFieldReserved(StringData fieldName) { + return fieldName == kMonotonicIdFieldsSpecField; +} + +DocumentSource::GetNextResult DocumentSourceStreamingGroup::getNextDocument() { + if (_firstDocumentOfNextBatch) { + GetNextResult result = std::move(_firstDocumentOfNextBatch.value()); + _firstDocumentOfNextBatch.reset(); + return result; + } + return pSource->getNext(); +} + +DocumentSource::GetNextResult DocumentSourceStreamingGroup::readyNextBatch() { + resetReadyGroups(); + GetNextResult input = getNextDocument(); + return readyNextBatchInner(input); +} + +// This separate NOINLINE function is used here to decrease stack utilization of readyNextBatch() +// and prevent stack overflows. +MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult +DocumentSourceStreamingGroup::readyNextBatchInner(GetNextResult input) { + setExecutionStarted(); + // Calculate groups until we either exaust pSource or encounter change in monotonic id + // expression, which means all current groups are finalized. + for (; input.isAdvanced(); input = pSource->getNext()) { + if (shouldSpillWithAttemptToSaveMemory()) { + spill(); + } + auto root = input.releaseDocument(); + Value id = computeId(root); + + if (isBatchFinished(id)) { + _firstDocumentOfNextBatch = std::move(root); + readyGroups(); + return input; + } + + processDocument(id, root); + } + + switch (input.getStatus()) { + case DocumentSource::GetNextResult::ReturnStatus::kAdvanced: { + MONGO_UNREACHABLE; // We consumed all advances above. + } + case DocumentSource::GetNextResult::ReturnStatus::kPauseExecution: { + return input; // Propagate pause. + } + case DocumentSource::GetNextResult::ReturnStatus::kEOF: { + readyGroups(); + _sourceDepleted = true; + return input; + } + } + MONGO_UNREACHABLE; +} + +bool DocumentSourceStreamingGroup::isBatchFinished(const Value& id) { + if (_idExpressions.size() == 1) { + tassert(7026706, + "if there are no explicit id fields, it is only one monotonic expression with id 0", + _monotonicExpressionIndexes.size() == 1 && _monotonicExpressionIndexes[0] == 0); + return checkForBatchEndAndUpdateLastIdValues([&](size_t) { return id; }); + } else { + tassert(7026707, + "if there are explicit id fields, internal representation of id is an array", + id.isArray()); + const std::vector<Value>& idValues = id.getArray(); + return checkForBatchEndAndUpdateLastIdValues([&](size_t i) { return idValues[i]; }); + } +} + +template <typename IdValueGetter> +bool DocumentSourceStreamingGroup::checkForBatchEndAndUpdateLastIdValues( + const IdValueGetter& idValueGetter) { + auto assertStreamable = [&](Value value) { + // Nullish and array values will mess us up because they sort differently than they group. + // A null and a missing value will compare equal in sorting, but could result in different + // groups, e.g. {_id: {x: null, y: null}} vs {_id: {}}. An array value will sort by the min + // or max element, with no tie breaking, but group by the whole array. This means that two + // of the exact same array could appear in the input sequence, but with a different array in + // the middle of them, and that would still be considered sorted. That would break our + // batching group logic. + tassert(7026708, + "Monotonic value should not be missing, null or an array", + !value.nullish() && !value.isArray()); + return value; + }; + + // If _lastMonotonicIdFieldValues is empty, it is the first document, so the only thing we need + // to do is initialize it. + if (_lastMonotonicIdFieldValues.empty()) { + for (size_t i : _monotonicExpressionIndexes) { + _lastMonotonicIdFieldValues.push_back(assertStreamable(idValueGetter(i))); + } + return false; + } else { + bool batchFinished = false; + for (size_t index = 0; index < _monotonicExpressionIndexes.size(); ++index) { + Value& oldId = _lastMonotonicIdFieldValues[index]; + const Value& id = assertStreamable(idValueGetter(_monotonicExpressionIndexes[index])); + if (pExpCtx->getValueComparator().compare(oldId, id) != 0) { + oldId = id; + batchFinished = true; + } + } + return batchFinished; + } +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_streaming_group.h b/src/mongo/db/pipeline/document_source_streaming_group.h new file mode 100644 index 00000000000..c939a6728f2 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_streaming_group.h @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <utility> + +#include "mongo/db/pipeline/document_source_group_base.h" + +namespace mongo { + +/** + * This class represents streaming group implementation that can only be used when at least one of + * _id fields is monotonic. It stores and output groups in batches. All groups in the batch has + * the same value of monotonic id fields. + * + * For example, if the inputs are sorted by "x", we could use a batched streaming algorithm to + * perform the grouping for {$group: {_id: {x: "$x", y: "$y"}}}. + * + * Groups are processes in batches. One batch corresponds to a set of groups when each monotonic + * id field have the same value. Non-monotonic fields can have different values, so we still may + * have multiple groups and even spill to disk, but we still consume significanty less memory + * than general hash based group. + * When a document with a different value in at least one group id field is encountered, it is + * cached in '_firstDocumentOfNextBatch', current groups are finalized and returned in + * subsequent getNext() called and when the current batch is depleted, memory is freeed and the + * process starts again. + * + * TODO SERVER-71437 Implement an optimization for a special case where all group fields are + * monotonic + * - we don't need any hashing in this case. + */ +class DocumentSourceStreamingGroup final : public DocumentSourceGroupBase { +public: + static constexpr StringData kStageName = "$_internalStreamingGroup"_sd; + + const char* getSourceName() const final; + + /** + * Parses 'elem' into a $_internalStreamingGroup stage, or throws a AssertionException if 'elem' + * was an invalid specification. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + static boost::intrusive_ptr<DocumentSource> createFromBsonWithMaxMemoryUsage( + BSONElement elem, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes); + +protected: + GetNextResult doGetNext() final; + + bool isSpecFieldReserved(StringData fieldName) final; + void serializeAdditionalFields(MutableDocument& out, + boost::optional<ExplainOptions::Verbosity> explain) const final; + +private: + static constexpr StringData kMonotonicIdFieldsSpecField = "$monotonicIdFields"_sd; + + explicit DocumentSourceStreamingGroup( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes = boost::none); + + + GetNextResult getNextDocument(); + + GetNextResult readyNextBatch(); + /** + * Readies next batch after all children are initialized. See readyNextBatch() for + * more details. + */ + GetNextResult readyNextBatchInner(GetNextResult input); + + bool isBatchFinished(const Value& id); + + template <typename IdValueGetter> + bool checkForBatchEndAndUpdateLastIdValues(const IdValueGetter& idValueGetter); + + std::vector<size_t> _monotonicExpressionIndexes; + std::vector<Value> _lastMonotonicIdFieldValues; + + boost::optional<Document> _firstDocumentOfNextBatch; + + bool _sourceDepleted; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/group_from_first_document_transformation.cpp b/src/mongo/db/pipeline/group_from_first_document_transformation.cpp new file mode 100644 index 00000000000..411754eb744 --- /dev/null +++ b/src/mongo/db/pipeline/group_from_first_document_transformation.cpp @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/group_from_first_document_transformation.h" + +namespace mongo { +Document GroupFromFirstDocumentTransformation::applyTransformation(const Document& input) { + MutableDocument output(_accumulatorExprs.size()); + + for (auto&& expr : _accumulatorExprs) { + auto value = expr.second->evaluate(input, &expr.second->getExpressionContext()->variables); + output.addField(expr.first, value.missing() ? Value(BSONNULL) : std::move(value)); + } + + return output.freeze(); +} + +void GroupFromFirstDocumentTransformation::optimize() { + for (auto&& expr : _accumulatorExprs) { + expr.second = expr.second->optimize(); + } +} + +Document GroupFromFirstDocumentTransformation::serializeTransformation( + boost::optional<ExplainOptions::Verbosity> explain) const { + + MutableDocument newRoot(_accumulatorExprs.size()); + for (auto&& expr : _accumulatorExprs) { + newRoot.addField(expr.first, expr.second->serialize(static_cast<bool>(explain))); + } + + return {{"newRoot", newRoot.freezeToValue()}}; +} + +DepsTracker::State GroupFromFirstDocumentTransformation::addDependencies(DepsTracker* deps) const { + for (auto&& expr : _accumulatorExprs) { + expression::addDependencies(expr.second.get(), deps); + } + + // This stage will replace the entire document with a new document, so any existing fields + // will be replaced and cannot be required as dependencies. We use EXHAUSTIVE_ALL here + // instead of EXHAUSTIVE_FIELDS, as in ReplaceRootTransformation, because the stages that + // follow a $group stage should not depend on document metadata. + return DepsTracker::State::EXHAUSTIVE_ALL; +} + +void GroupFromFirstDocumentTransformation::addVariableRefs(std::set<Variables::Id>* refs) const { + for (auto&& expr : _accumulatorExprs) { + expression::addVariableRefs(expr.second.get(), refs); + } +} + +DocumentSource::GetModPathsReturn GroupFromFirstDocumentTransformation::getModifiedPaths() const { + // Replaces the entire root, so all paths are modified. + return {DocumentSource::GetModPathsReturn::Type::kAllPaths, OrderedPathSet{}, {}}; +} + +std::unique_ptr<GroupFromFirstDocumentTransformation> GroupFromFirstDocumentTransformation::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const std::string& groupId, + StringData originalStageName, + std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs) { + return std::make_unique<GroupFromFirstDocumentTransformation>( + groupId, originalStageName, std::move(accumulatorExprs)); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/group_from_first_document_transformation.h b/src/mongo/db/pipeline/group_from_first_document_transformation.h new file mode 100644 index 00000000000..a7beafc4989 --- /dev/null +++ b/src/mongo/db/pipeline/group_from_first_document_transformation.h @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/transformer_interface.h" + +namespace mongo { + +/** + * GroupFromFirstTransformation consists of a list of (field name, expression pairs). It returns a + * document synthesized by assigning each field name in the output document to the result of + * evaluating the corresponding expression. If the expression evaluates to missing, we assign a + * value of BSONNULL. This is necessary to match the semantics of $first for missing fields. + */ +class GroupFromFirstDocumentTransformation final : public TransformerInterface { +public: + GroupFromFirstDocumentTransformation( + const std::string& groupId, + StringData originalStageName, + std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs) + : _accumulatorExprs(std::move(accumulatorExprs)), + _groupId(groupId), + _originalStageName(originalStageName) {} + + TransformerType getType() const final { + return TransformerType::kGroupFromFirstDocument; + } + + /** + * The path of the field that we are grouping on: i.e., the field in the input document that we + * will use to create the _id field of the ouptut document. + */ + const std::string& groupId() const { + return _groupId; + } + + StringData originalStageName() const { + return _originalStageName; + } + + Document applyTransformation(const Document& input) final; + + void optimize() final; + + Document serializeTransformation( + boost::optional<ExplainOptions::Verbosity> explain) const final; + + DepsTracker::State addDependencies(DepsTracker* deps) const final; + + void addVariableRefs(std::set<Variables::Id>* refs) const final; + + DocumentSource::GetModPathsReturn getModifiedPaths() const final; + + static std::unique_ptr<GroupFromFirstDocumentTransformation> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const std::string& groupId, + StringData originalStageName, + std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs); + +private: + std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> _accumulatorExprs; + std::string _groupId; + StringData _originalStageName; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index ec84917b5c5..0998f2a6f1d 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -826,10 +826,10 @@ namespace { * the case of a $sort with a non-null value for getLimitSrc(), indicating that there was previously * a $limit stage that was optimized away. */ -std::pair<boost::intrusive_ptr<DocumentSourceSort>, boost::intrusive_ptr<DocumentSourceGroup>> +std::pair<boost::intrusive_ptr<DocumentSourceSort>, boost::intrusive_ptr<DocumentSourceGroupBase>> getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) { boost::intrusive_ptr<DocumentSourceSort> sortStage = nullptr; - boost::intrusive_ptr<DocumentSourceGroup> groupStage = nullptr; + boost::intrusive_ptr<DocumentSourceGroupBase> groupStage = nullptr; auto sourcesIt = sources.begin(); if (sourcesIt != sources.end()) { @@ -845,7 +845,7 @@ getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) { } if (sourcesIt != sources.end()) { - groupStage = dynamic_cast<DocumentSourceGroup*>(sourcesIt->get()); + groupStage = dynamic_cast<DocumentSourceGroupBase*>(sourcesIt->get()); } return std::make_pair(sortStage, groupStage); @@ -1688,7 +1688,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // will handle the sort, and the groupTransform (added below) will handle the $group // stage. pipeline->popFrontWithName(DocumentSourceSort::kStageName); - pipeline->popFrontWithName(DocumentSourceGroup::kStageName); + pipeline->popFrontWithName(rewrittenGroupStage->originalStageName()); boost::intrusive_ptr<DocumentSource> groupTransform( new DocumentSourceSingleDocumentTransformation( |