summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Fefer <ivan.fefer@mongodb.com>2022-11-18 10:48:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-18 11:17:40 +0000
commit4cfd9b936b68622274e39100b7859ea8eb089ad8 (patch)
tree1a2c3a89c9e6cf8fece70d2f0cc9b7a5c72e610e
parent092b2eec8182bf540d73bd77649617ad2a36300d (diff)
downloadmongo-4cfd9b936b68622274e39100b7859ea8eb089ad8.tar.gz
SERVER-70267 Add DocumentSourceStreamingGroup
-rw-r--r--jstests/aggregation/group_conversion_to_distinct_scan.js25
-rw-r--r--src/mongo/db/pipeline/SConscript3
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp870
-rw-r--r--src/mongo/db/pipeline/document_source_group.h268
-rw-r--r--src/mongo/db/pipeline/document_source_group_base.cpp821
-rw-r--r--src/mongo/db/pipeline/document_source_group_base.h267
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp361
-rw-r--r--src/mongo/db/pipeline/document_source_streaming_group.cpp256
-rw-r--r--src/mongo/db/pipeline/document_source_streaming_group.h114
-rw-r--r--src/mongo/db/pipeline/group_from_first_document_transformation.cpp93
-rw-r--r--src/mongo/db/pipeline/group_from_first_document_transformation.h94
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp8
12 files changed, 2078 insertions, 1102 deletions
diff --git a/jstests/aggregation/group_conversion_to_distinct_scan.js b/jstests/aggregation/group_conversion_to_distinct_scan.js
index 95a55fec612..ba2f55ec64b 100644
--- a/jstests/aggregation/group_conversion_to_distinct_scan.js
+++ b/jstests/aggregation/group_conversion_to_distinct_scan.js
@@ -42,7 +42,7 @@ function createIndexes() {
}
createIndexes();
-assert.commandWorked(coll.insert([
+const documents = [
{_id: 0, a: 1, b: 1, c: 1},
{_id: 1, a: 1, b: 2, c: 2},
{_id: 2, a: 1, b: 2, c: 3},
@@ -71,7 +71,8 @@ assert.commandWorked(coll.insert([
{_id: 21, str: "FoO", d: 2},
{_id: 22, str: "bar", d: 4},
{_id: 23, str: "bAr", d: 3}
-]));
+];
+assert.commandWorked(coll.insert(documents));
// Helper for dropping an index and removing it from the list of indexes.
function removeIndex(pattern) {
@@ -741,4 +742,24 @@ assertResultsMatchWithAndWithoutHintandIndexes(
explain = coll.explain().aggregate(pipeline, collationOption);
assert.neq(null, getAggPlanStage(explain, "DISTINCT_SCAN"), explain);
assert.eq({str: 1, d: 1}, getAggPlanStage(explain, "DISTINCT_SCAN").keyPattern);
+
+//
+// Verify that a $sort-$_internalStreamingGroup pipeline can use DISTINCT_SCAN
+//
+pipeline = [
+ {$sort: {_id: 1}},
+ {$_internalStreamingGroup: {_id: "$_id", value: {$first: "$a"}, $monotonicIdFields: ["_id"]}}
+];
+const expectedResult = [];
+for (let i = 0; i <= 23; i++) {
+ let resultDocument = {_id: i, value: null};
+ if (documents[i].hasOwnProperty("a")) {
+ resultDocument["value"] = documents[i].a;
+ }
+ expectedResult.push(resultDocument);
+}
+assertResultsMatchWithAndWithoutHintandIndexes(pipeline, expectedResult);
+explain = coll.explain().aggregate(pipeline);
+assert.neq(null, getAggPlanStage(explain, "DISTINCT_SCAN"), explain);
+assert.eq({_id: 1}, getAggPlanStage(explain, "DISTINCT_SCAN").keyPattern);
}());
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 522dff15844..d3f93a2da8a 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -263,6 +263,7 @@ pipelineEnv.Library(
'document_source_geo_near.cpp',
'document_source_graph_lookup.cpp',
'document_source_group.cpp',
+ 'document_source_group_base.cpp',
'document_source_index_stats.cpp',
'document_source_internal_all_collection_stats.cpp',
'document_source_internal_compute_geo_near_distance.cpp',
@@ -296,10 +297,12 @@ pipelineEnv.Library(
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_sort_by_count.cpp',
+ 'document_source_streaming_group.cpp',
'document_source_tee_consumer.cpp',
'document_source_telemetry.cpp',
'document_source_union_with.cpp',
'document_source_unwind.cpp',
+ 'group_from_first_document_transformation.cpp',
'pipeline.cpp',
'search_helper.cpp',
'semantic_analysis.cpp',
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 13694a79a60..15d208ea252 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -46,88 +46,6 @@
namespace mongo {
-namespace {
-
-/**
- * Generates a new file name on each call using a static, atomic and monotonically increasing
- * number.
- *
- * Each user of the Sorter must implement this function to ensure that all temporary files that the
- * Sorter instances produce are uniquely identified using a unique file name extension with separate
- * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
- * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
- */
-std::string nextFileName() {
- static AtomicWord<unsigned> documentSourceGroupFileCounter;
- return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1));
-}
-
-} // namespace
-
-using boost::intrusive_ptr;
-using std::pair;
-using std::shared_ptr;
-using std::vector;
-
-Document GroupFromFirstDocumentTransformation::applyTransformation(const Document& input) {
- MutableDocument output(_accumulatorExprs.size());
-
- for (auto&& expr : _accumulatorExprs) {
- auto value = expr.second->evaluate(input, &expr.second->getExpressionContext()->variables);
- output.addField(expr.first, value.missing() ? Value(BSONNULL) : std::move(value));
- }
-
- return output.freeze();
-}
-
-void GroupFromFirstDocumentTransformation::optimize() {
- for (auto&& expr : _accumulatorExprs) {
- expr.second = expr.second->optimize();
- }
-}
-
-Document GroupFromFirstDocumentTransformation::serializeTransformation(
- boost::optional<ExplainOptions::Verbosity> explain) const {
-
- MutableDocument newRoot(_accumulatorExprs.size());
- for (auto&& expr : _accumulatorExprs) {
- newRoot.addField(expr.first, expr.second->serialize(static_cast<bool>(explain)));
- }
-
- return {{"newRoot", newRoot.freezeToValue()}};
-}
-
-DepsTracker::State GroupFromFirstDocumentTransformation::addDependencies(DepsTracker* deps) const {
- for (auto&& expr : _accumulatorExprs) {
- expression::addDependencies(expr.second.get(), deps);
- }
-
- // This stage will replace the entire document with a new document, so any existing fields
- // will be replaced and cannot be required as dependencies. We use EXHAUSTIVE_ALL here
- // instead of EXHAUSTIVE_FIELDS, as in ReplaceRootTransformation, because the stages that
- // follow a $group stage should not depend on document metadata.
- return DepsTracker::State::EXHAUSTIVE_ALL;
-}
-
-void GroupFromFirstDocumentTransformation::addVariableRefs(std::set<Variables::Id>* refs) const {
- for (auto&& expr : _accumulatorExprs) {
- expression::addVariableRefs(expr.second.get(), refs);
- }
-}
-
-DocumentSource::GetModPathsReturn GroupFromFirstDocumentTransformation::getModifiedPaths() const {
- // Replaces the entire root, so all paths are modified.
- return {DocumentSource::GetModPathsReturn::Type::kAllPaths, OrderedPathSet{}, {}};
-}
-
-std::unique_ptr<GroupFromFirstDocumentTransformation> GroupFromFirstDocumentTransformation::create(
- const intrusive_ptr<ExpressionContext>& expCtx,
- const std::string& groupId,
- vector<pair<std::string, intrusive_ptr<Expression>>> accumulatorExprs) {
- return std::make_unique<GroupFromFirstDocumentTransformation>(groupId,
- std::move(accumulatorExprs));
-}
-
constexpr StringData DocumentSourceGroup::kStageName;
REGISTER_DOCUMENT_SOURCE(group,
@@ -139,527 +57,77 @@ const char* DocumentSourceGroup::getSourceName() const {
return kStageName.rawData();
}
-bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory() {
- if (!_memoryTracker._allowDiskUse &&
- (_memoryTracker.currentMemoryBytes() >
- static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes))) {
- freeMemory();
- }
-
- if (_memoryTracker.currentMemoryBytes() >
- static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes)) {
- uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- "Exceeded memory limit for $group, but didn't allow external sort."
- " Pass allowDiskUse:true to opt in.",
- _memoryTracker._allowDiskUse);
- _memoryTracker.resetCurrent();
- return true;
- }
- return false;
-}
-
-void DocumentSourceGroup::freeMemory() {
- invariant(_groups);
- for (auto&& group : *_groups) {
- for (size_t i = 0; i < group.second.size(); i++) {
- // Subtract the current usage.
- _memoryTracker.update(_accumulatedFields[i].fieldName,
- -1 * group.second[i]->getMemUsage());
-
- group.second[i]->reduceMemoryConsumptionIfAble();
-
- // Update the memory usage for this AccumulationStatement.
- _memoryTracker.update(_accumulatedFields[i].fieldName, group.second[i]->getMemUsage());
- }
- }
-}
-
-DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() {
- if (!_initialized) {
- const auto initializationResult = initialize();
- if (initializationResult.isPaused()) {
- return initializationResult;
- }
- invariant(initializationResult.isEOF());
- }
-
- for (auto&& accum : _currentAccumulators) {
- accum->reset(); // Prep accumulators for a new group.
- }
-
- if (_spilled) {
- return getNextSpilled();
- } else {
- return getNextStandard();
- }
-}
-
-DocumentSource::GetNextResult DocumentSourceGroup::getNextSpilled() {
- // We aren't streaming, and we have spilled to disk.
- if (!_sorterIterator)
- return GetNextResult::makeEOF();
-
- _currentId = _firstPartOfNextGroup.first;
- const size_t numAccumulators = _accumulatedFields.size();
-
- // Call startNewGroup on every accumulator.
- Value expandedId = expandId(_currentId);
- Document idDoc =
- expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
- for (size_t i = 0; i < numAccumulators; ++i) {
- Value initializerValue =
- _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables);
- _currentAccumulators[i]->startNewGroup(initializerValue);
- }
-
- while (pExpCtx->getValueComparator().evaluate(_currentId == _firstPartOfNextGroup.first)) {
- // Inside of this loop, _firstPartOfNextGroup is the current data being processed.
- // At loop exit, it is the first value to be processed in the next group.
- switch (numAccumulators) { // mirrors switch in spill()
- case 1: // Single accumulators serialize as a single Value.
- _currentAccumulators[0]->process(_firstPartOfNextGroup.second, true);
- [[fallthrough]];
- case 0: // No accumulators so no Values.
- break;
- default: { // Multiple accumulators serialize as an array of Values.
- const vector<Value>& accumulatorStates = _firstPartOfNextGroup.second.getArray();
- for (size_t i = 0; i < numAccumulators; i++) {
- _currentAccumulators[i]->process(accumulatorStates[i], true);
- }
- }
- }
-
- if (!_sorterIterator->more()) {
- dispose();
- break;
- }
-
- _firstPartOfNextGroup = _sorterIterator->next();
- }
-
- return makeDocument(_currentId, _currentAccumulators, pExpCtx->needsMerge);
-}
-
-DocumentSource::GetNextResult DocumentSourceGroup::getNextStandard() {
- // Not spilled, and not streaming.
- if (_groups->empty())
- return GetNextResult::makeEOF();
-
- Document out = makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->needsMerge);
-
- if (++groupsIterator == _groups->end())
- dispose();
-
- return out;
-}
-
-void DocumentSourceGroup::doDispose() {
- // Free our resources.
- _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
- _sorterIterator.reset();
-
- // Make us look done.
- groupsIterator = _groups->end();
-}
-
-intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() {
- // Optimizing a 'DocumentSourceGroup' might modify its expressions to become incompatible with
- // SBE. We temporarily highjack the context's 'sbeCompatible' flag to communicate the situation
- // back to the 'DocumentSourceGroup'. Notice, that while a particular 'DocumentSourceGroup'
- // might become incompatible with SBE, other groups in the pipeline and the collection access
- // could be still eligible for lowering to SBE, thus we must reset the context's 'sbeCompatible'
- // flag back to its original value at the end of the 'optimize()' call.
- //
- // TODO SERVER-XXXXX: replace this hack with a proper per-stage tracking of SBE compatibility.
- auto expCtx = _idExpressions[0]->getExpressionContext();
- auto orgSbeCompatible = expCtx->sbeCompatible;
- expCtx->sbeCompatible = true;
-
- // TODO: If all _idExpressions are ExpressionConstants after optimization, then we know there
- // will be only one group. We should take advantage of that to avoid going through the hash
- // table.
- for (size_t i = 0; i < _idExpressions.size(); i++) {
- _idExpressions[i] = _idExpressions[i]->optimize();
- }
-
- for (auto&& accumulatedField : _accumulatedFields) {
- accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize();
- accumulatedField.expr.argument = accumulatedField.expr.argument->optimize();
- }
-
- _sbeCompatible = _sbeCompatible && expCtx->sbeCompatible;
- expCtx->sbeCompatible = orgSbeCompatible;
-
- return this;
-}
-
-Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- MutableDocument insides;
-
- // Add the _id.
- if (_idFieldNames.empty()) {
- invariant(_idExpressions.size() == 1);
- insides["_id"] = _idExpressions[0]->serialize(static_cast<bool>(explain));
- } else {
- // Decomposed document case.
- invariant(_idExpressions.size() == _idFieldNames.size());
- MutableDocument md;
- for (size_t i = 0; i < _idExpressions.size(); i++) {
- md[_idFieldNames[i]] = _idExpressions[i]->serialize(static_cast<bool>(explain));
- }
- insides["_id"] = md.freezeToValue();
- }
-
- // Add the remaining fields.
- for (auto&& accumulatedField : _accumulatedFields) {
- intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator();
- insides[accumulatedField.fieldName] =
- Value(accum->serialize(accumulatedField.expr.initializer,
- accumulatedField.expr.argument,
- static_cast<bool>(explain)));
- }
-
- if (_doingMerge) {
- // This makes the output unparsable (with error) on pre 2.6 shards, but it will never
- // be sent to old shards when this flag is true since they can't do a merge anyway.
- insides["$doingMerge"] = Value(true);
- }
-
- MutableDocument out;
- out[getSourceName()] = Value(insides.freeze());
-
- if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) {
- MutableDocument md;
-
- for (size_t i = 0; i < _accumulatedFields.size(); i++) {
- md[_accumulatedFields[i].fieldName] = Value(static_cast<long long>(
- _memoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes()));
- }
-
- out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue());
- out["totalOutputDataSizeBytes"] =
- Value(static_cast<long long>(_stats.totalOutputDataSizeBytes));
- out["usedDisk"] = Value(_stats.spills > 0);
- out["spills"] = Value(static_cast<long long>(_stats.spills));
- }
-
- return Value(out.freezeToValue());
-}
-
-DepsTracker::State DocumentSourceGroup::getDependencies(DepsTracker* deps) const {
- // add the _id
- for (size_t i = 0; i < _idExpressions.size(); i++) {
- expression::addDependencies(_idExpressions[i].get(), deps);
- }
-
- // add the rest
- for (auto&& accumulatedField : _accumulatedFields) {
- expression::addDependencies(accumulatedField.expr.argument.get(), deps);
- // Don't add initializer, because it doesn't refer to docs from the input stream.
- }
-
- return DepsTracker::State::EXHAUSTIVE_ALL;
-}
-
-void DocumentSourceGroup::addVariableRefs(std::set<Variables::Id>* refs) const {
- for (const auto& idExpr : _idExpressions) {
- expression::addVariableRefs(idExpr.get(), refs);
- }
-
- for (auto&& accumulatedField : _accumulatedFields) {
- expression::addVariableRefs(accumulatedField.expr.argument.get(), refs);
- }
-}
-
-DocumentSource::GetModPathsReturn DocumentSourceGroup::getModifiedPaths() const {
- // We preserve none of the fields, but any fields referenced as part of the group key are
- // logically just renamed.
- StringMap<std::string> renames;
- for (std::size_t i = 0; i < _idExpressions.size(); ++i) {
- auto idExp = _idExpressions[i];
- auto pathToPutResultOfExpression =
- _idFieldNames.empty() ? "_id" : "_id." + _idFieldNames[i];
- auto computedPaths = idExp->getComputedPaths(pathToPutResultOfExpression);
- for (auto&& rename : computedPaths.renames) {
- renames[rename.first] = rename.second;
- }
- }
-
- return {DocumentSource::GetModPathsReturn::Type::kAllExcept,
- OrderedPathSet{}, // No fields are preserved.
- std::move(renames)};
-}
-
-StringMap<boost::intrusive_ptr<Expression>> DocumentSourceGroup::getIdFields() const {
- if (_idFieldNames.empty()) {
- invariant(_idExpressions.size() == 1);
- return {{"_id", _idExpressions[0]}};
- } else {
- invariant(_idFieldNames.size() == _idExpressions.size());
- StringMap<boost::intrusive_ptr<Expression>> result;
- for (std::size_t i = 0; i < _idFieldNames.size(); ++i) {
- result["_id." + _idFieldNames[i]] = _idExpressions[i];
- }
- return result;
- }
-}
-
-std::vector<boost::intrusive_ptr<Expression>>& DocumentSourceGroup::getMutableIdFields() {
- tassert(7020503, "Can't mutate _id fields after initialization", !_initialized);
- return _idExpressions;
-}
-
-const std::vector<AccumulationStatement>& DocumentSourceGroup::getAccumulatedFields() const {
- return _accumulatedFields;
-}
-
-std::vector<AccumulationStatement>& DocumentSourceGroup::getMutableAccumulatedFields() {
- tassert(7020504, "Can't mutate accumulated fields after initialization", !_initialized);
- return _accumulatedFields;
-}
-
-intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create(
- const intrusive_ptr<ExpressionContext>& expCtx,
+boost::intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const boost::intrusive_ptr<Expression>& groupByExpression,
std::vector<AccumulationStatement> accumulationStatements,
boost::optional<size_t> maxMemoryUsageBytes) {
- intrusive_ptr<DocumentSourceGroup> groupStage(
+ boost::intrusive_ptr<DocumentSourceGroup> groupStage(
new DocumentSourceGroup(expCtx, maxMemoryUsageBytes));
groupStage->setIdExpression(groupByExpression);
for (auto&& statement : accumulationStatements) {
groupStage->addAccumulator(statement);
- groupStage->_memoryTracker.set(statement.fieldName, 0);
}
return groupStage;
}
-DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& expCtx,
+DocumentSourceGroup::DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<size_t> maxMemoryUsageBytes)
- : DocumentSource(kStageName, expCtx),
- _doingMerge(false),
- _memoryTracker{expCtx->allowDiskUse && !expCtx->inMongos,
- maxMemoryUsageBytes
- ? *maxMemoryUsageBytes
- : static_cast<size_t>(internalDocumentSourceGroupMaxMemoryBytes.load())},
- _initialized(false),
- _groups(expCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
- _spilled(false),
- _sbeCompatible(false) {}
+ : DocumentSourceGroupBase(kStageName, expCtx, maxMemoryUsageBytes), _groupsReady(false) {}
-void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
- _accumulatedFields.push_back(accumulationStatement);
+boost::intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return createFromBsonWithMaxMemoryUsage(std::move(elem), expCtx, boost::none);
}
-namespace {
-
-intrusive_ptr<Expression> parseIdExpression(const intrusive_ptr<ExpressionContext>& expCtx,
- BSONElement groupField,
- const VariablesParseState& vps) {
- if (groupField.type() == Object) {
- // {_id: {}} is treated as grouping on a constant, not an expression
- if (groupField.Obj().isEmpty()) {
- return ExpressionConstant::create(expCtx.get(), Value(groupField));
- }
-
- const BSONObj idKeyObj = groupField.Obj();
- if (idKeyObj.firstElementFieldName()[0] == '$') {
- // grouping on a $op expression
- return Expression::parseObject(expCtx.get(), idKeyObj, vps);
- } else {
- for (auto&& field : idKeyObj) {
- uassert(17390,
- "$group does not support inclusion-style expressions",
- !field.isNumber() && field.type() != Bool);
- }
- return ExpressionObject::parse(expCtx.get(), idKeyObj, vps);
- }
- } else {
- return Expression::parseOperand(expCtx.get(), groupField, vps);
- }
+boost::intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBsonWithMaxMemoryUsage(
+ BSONElement elem,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes) {
+ boost::intrusive_ptr<DocumentSourceGroup> groupStage(
+ new DocumentSourceGroup(expCtx, maxMemoryUsageBytes));
+ groupStage->initializeFromBson(elem);
+ return groupStage;
}
-} // namespace
-
-void DocumentSourceGroup::setIdExpression(const boost::intrusive_ptr<Expression> idExpression) {
- if (auto object = dynamic_cast<ExpressionObject*>(idExpression.get())) {
- auto& childExpressions = object->getChildExpressions();
- invariant(!childExpressions.empty()); // We expect to have converted an empty object into a
- // constant expression.
-
- // grouping on an "artificial" object. Rather than create the object for each input
- // in initialize(), instead group on the output of the raw expressions. The artificial
- // object will be created at the end in makeDocument() while outputting results.
- for (auto&& childExpPair : childExpressions) {
- _idFieldNames.push_back(childExpPair.first);
- _idExpressions.push_back(childExpPair.second);
+DocumentSource::GetNextResult DocumentSourceGroup::doGetNext() {
+ if (!_groupsReady) {
+ const auto initializationResult = performBlockingGroup();
+ if (initializationResult.isPaused()) {
+ return initializationResult;
}
- } else {
- _idExpressions.push_back(idExpression);
- }
-}
-
-boost::intrusive_ptr<Expression> DocumentSourceGroup::getIdExpression() const {
- // _idFieldNames is empty and _idExpressions has one element when the _id expression is not an
- // object expression.
- if (_idFieldNames.empty() && _idExpressions.size() == 1) {
- return _idExpressions[0];
- }
-
- tassert(6586300,
- "Field and its expression must be always paired in ExpressionObject",
- _idFieldNames.size() > 0 && _idFieldNames.size() == _idExpressions.size());
-
- // Each expression in '_idExpressions' may have been optimized and so, compose the object _id
- // expression out of the optimized expressions.
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fieldsAndExprs;
- for (size_t i = 0; i < _idExpressions.size(); ++i) {
- fieldsAndExprs.emplace_back(_idFieldNames[i], _idExpressions[i]);
+ invariant(initializationResult.isEOF());
}
- return ExpressionObject::create(_idExpressions[0]->getExpressionContext(),
- std::move(fieldsAndExprs));
-}
-
-intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson(
- BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
- uassert(15947, "a group's fields must be specified in an object", elem.type() == Object);
-
- intrusive_ptr<DocumentSourceGroup> groupStage(new DocumentSourceGroup(expCtx));
-
- BSONObj groupObj(elem.Obj());
- BSONObjIterator groupIterator(groupObj);
- VariablesParseState vps = expCtx->variablesParseState;
- expCtx->sbeGroupCompatible = true;
- while (groupIterator.more()) {
- BSONElement groupField(groupIterator.next());
- StringData pFieldName = groupField.fieldNameStringData();
- if (pFieldName == "_id") {
- uassert(15948,
- "a group's _id may only be specified once",
- groupStage->_idExpressions.empty());
- groupStage->setIdExpression(parseIdExpression(expCtx, groupField, vps));
- invariant(!groupStage->_idExpressions.empty());
- } else if (pFieldName == "$doingMerge") {
- massert(17030, "$doingMerge should be true if present", groupField.Bool());
-
- groupStage->setDoingMerge(true);
- } else {
- // Any other field will be treated as an accumulator specification.
- groupStage->addAccumulator(
- AccumulationStatement::parseAccumulationStatement(expCtx.get(), groupField, vps));
- groupStage->_memoryTracker.set(pFieldName, 0);
- }
+ auto result = getNextReadyGroup();
+ if (result.isEOF()) {
+ dispose();
}
- groupStage->_sbeCompatible = expCtx->sbeGroupCompatible && expCtx->sbeCompatible;
-
- uassert(
- 15955, "a group specification must include an _id", !groupStage->_idExpressions.empty());
- return groupStage;
+ return result;
}
-namespace {
-
-using GroupsMap = DocumentSourceGroup::GroupsMap;
-
-class SorterComparator {
-public:
- SorterComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {}
-
- int operator()(const Value& lhs, const Value& rhs) const {
- return _valueComparator.compare(lhs, rhs);
- }
-
-private:
- ValueComparator _valueComparator;
-};
-
-class SpillSTLComparator {
-public:
- SpillSTLComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {}
-
- bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const {
- return _valueComparator.evaluate(lhs->first < rhs->first);
- }
-
-private:
- ValueComparator _valueComparator;
-};
-} // namespace
-
-DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
+DocumentSource::GetNextResult DocumentSourceGroup::performBlockingGroup() {
GetNextResult input = pSource->getNext();
- return initializeSelf(input);
+ return performBlockingGroupSelf(input);
}
-// This separate NOINLINE function is used here to decrease stack utilization of initialize() and
-// prevent stack overflows.
-MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initializeSelf(
+// This separate NOINLINE function is used here to decrease stack utilization of
+// performBlockingGroup() and prevent stack overflows.
+MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::performBlockingGroupSelf(
GetNextResult input) {
- const size_t numAccumulators = _accumulatedFields.size();
+ setExecutionStarted();
// Barring any pausing, this loop exhausts 'pSource' and populates '_groups'.
for (; input.isAdvanced(); input = pSource->getNext()) {
if (shouldSpillWithAttemptToSaveMemory()) {
- _sortedFiles.push_back(spill());
+ spill();
}
// We release the result document here so that it does not outlive the end of this loop
// iteration. Not releasing could lead to an array copy when this group follows an unwind.
auto rootDocument = input.releaseDocument();
Value id = computeId(rootDocument);
-
- // Look for the _id value in the map. If it's not there, add a new entry with a blank
- // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and
- // looking it up in '_groups' multiple times.
- const size_t oldSize = _groups->size();
- vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id];
- const bool inserted = _groups->size() != oldSize;
-
- vector<uint64_t> oldAccumMemUsage(numAccumulators, 0);
- if (inserted) {
- _memoryTracker.set(_memoryTracker.currentMemoryBytes() + id.getApproximateSize());
-
- // Initialize and add the accumulators
- Value expandedId = expandId(id);
- Document idDoc =
- expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
- group.reserve(numAccumulators);
- for (auto&& accumulatedField : _accumulatedFields) {
- auto accum = accumulatedField.makeAccumulator();
- Value initializerValue =
- accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables);
- accum->startNewGroup(initializerValue);
- group.push_back(accum);
- }
- }
-
- /* tickle all the accumulators for the group we found */
- dassert(numAccumulators == group.size());
-
- for (size_t i = 0; i < numAccumulators; i++) {
- // Only process the input and update the memory footprint if the current accumulator
- // needs more input.
- if (group[i]->needsInput()) {
- const auto prevMemUsage = inserted ? 0 : group[i]->getMemUsage();
- group[i]->process(_accumulatedFields[i].expr.argument->evaluate(
- rootDocument, &pExpCtx->variables),
- _doingMerge);
- _memoryTracker.update(_accumulatedFields[i].fieldName,
- group[i]->getMemUsage() - prevMemUsage);
- }
- }
-
- if (kDebugBuild && !pExpCtx->opCtx->readOnly()) {
- // In debug mode, spill every time we have a duplicate id to stress merge logic.
- if (!inserted && // is a dup
- !pExpCtx->inMongos && // can't spill to disk in mongos
- !_memoryTracker
- ._allowDiskUse && // don't change behavior when testing external sort
- _sortedFiles.size() < 20) { // don't open too many FDs
-
- _sortedFiles.push_back(spill());
- }
- }
+ processDocument(id, rootDocument);
}
switch (input.getStatus()) {
@@ -670,280 +138,14 @@ MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult DocumentSourceGroup::initi
return input; // Propagate pause.
}
case DocumentSource::GetNextResult::ReturnStatus::kEOF: {
- // Do any final steps necessary to prepare to output results.
- if (!_sortedFiles.empty()) {
- _spilled = true;
- if (!_groups->empty()) {
- _sortedFiles.push_back(spill());
- }
-
- // We won't be using groups again so free its memory.
- _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
-
- _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
- _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
-
- // prepare current to accumulate data
- _currentAccumulators.reserve(numAccumulators);
- for (auto&& accumulatedField : _accumulatedFields) {
- _currentAccumulators.push_back(accumulatedField.makeAccumulator());
- }
-
- verify(_sorterIterator->more()); // we put data in, we should get something out.
- _firstPartOfNextGroup = _sorterIterator->next();
- } else {
- // start the group iterator
- groupsIterator = _groups->begin();
- }
-
+ readyGroups();
// This must happen last so that, unless control gets here, we will re-enter
// initialization after getting a GetNextResult::ResultState::kPauseExecution.
- _initialized = true;
+ _groupsReady = true;
return input;
}
}
MONGO_UNREACHABLE;
}
-shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
- _stats.spills++;
-
- vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting
- ptrs.reserve(_groups->size());
- for (GroupsMap::const_iterator it = _groups->begin(), end = _groups->end(); it != end; ++it) {
- ptrs.push_back(&*it);
- }
-
- stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator()));
-
- // Initialize '_file' in a lazy manner only when it is needed.
- if (!_file) {
- _file =
- std::make_shared<Sorter<Value, Value>::File>(pExpCtx->tempDir + "/" + nextFileName());
- }
- SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file);
- switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i.
- case 0: // no values, essentially a distinct
- for (size_t i = 0; i < ptrs.size(); i++) {
- writer.addAlreadySorted(ptrs[i]->first, Value());
- }
- break;
-
- case 1: // just one value, use optimized serialization as single Value
- for (size_t i = 0; i < ptrs.size(); i++) {
- writer.addAlreadySorted(ptrs[i]->first,
- ptrs[i]->second[0]->getValue(/*toBeMerged=*/true));
- }
- break;
-
- default: // multiple values, serialize as array-typed Value
- for (size_t i = 0; i < ptrs.size(); i++) {
- vector<Value> accums;
- for (size_t j = 0; j < ptrs[i]->second.size(); j++) {
- accums.push_back(ptrs[i]->second[j]->getValue(/*toBeMerged=*/true));
- }
- writer.addAlreadySorted(ptrs[i]->first, Value(std::move(accums)));
- }
- break;
- }
-
- auto& metricsCollector = ResourceConsumption::MetricsCollector::get(pExpCtx->opCtx);
- metricsCollector.incrementKeysSorted(ptrs.size());
- metricsCollector.incrementSorterSpills(1);
-
- _groups->clear();
- // Zero out the current per-accumulation statement memory consumption, as the memory has been
- // freed by spilling.
- for (const auto& accum : _accumulatedFields) {
- _memoryTracker.set(accum.fieldName, 0);
- }
-
- Sorter<Value, Value>::Iterator* iteratorPtr = writer.done();
- return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr);
-}
-
-Value DocumentSourceGroup::computeId(const Document& root) {
- // If only one expression, return result directly
- if (_idExpressions.size() == 1) {
- Value retValue = _idExpressions[0]->evaluate(root, &pExpCtx->variables);
- return retValue.missing() ? Value(BSONNULL) : std::move(retValue);
- }
-
- // Multiple expressions get results wrapped in a vector
- vector<Value> vals;
- vals.reserve(_idExpressions.size());
- for (size_t i = 0; i < _idExpressions.size(); i++) {
- vals.push_back(_idExpressions[i]->evaluate(root, &pExpCtx->variables));
- }
- return Value(std::move(vals));
-}
-
-Value DocumentSourceGroup::expandId(const Value& val) {
- // _id doesn't get wrapped in a document
- if (_idFieldNames.empty())
- return val;
-
- // _id is a single-field document containing val
- if (_idFieldNames.size() == 1)
- return Value(DOC(_idFieldNames[0] << val));
-
- // _id is a multi-field document containing the elements of val
- const vector<Value>& vals = val.getArray();
- invariant(_idFieldNames.size() == vals.size());
- MutableDocument md(vals.size());
- for (size_t i = 0; i < vals.size(); i++) {
- md[_idFieldNames[i]] = vals[i];
- }
- return md.freezeToValue();
-}
-
-Document DocumentSourceGroup::makeDocument(const Value& id,
- const Accumulators& accums,
- bool mergeableOutput) {
- const size_t n = _accumulatedFields.size();
- MutableDocument out(1 + n);
-
- /* add the _id field */
- out.addField("_id", expandId(id));
-
- /* add the rest of the fields */
- for (size_t i = 0; i < n; ++i) {
- Value val = accums[i]->getValue(mergeableOutput);
- if (val.missing()) {
- // we return null in this case so return objects are predictable
- out.addField(_accumulatedFields[i].fieldName, Value(BSONNULL));
- } else {
- out.addField(_accumulatedFields[i].fieldName, std::move(val));
- }
- }
-
- _stats.totalOutputDataSizeBytes += out.getApproximateSize();
- return out.freeze();
-}
-
-boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distributedPlanLogic() {
- intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx));
- mergingGroup->setDoingMerge(true);
-
- VariablesParseState vps = pExpCtx->variablesParseState;
- /* the merger will use the same grouping key */
- mergingGroup->setIdExpression(ExpressionFieldPath::parse(pExpCtx.get(), "$$ROOT._id", vps));
-
- for (auto&& accumulatedField : _accumulatedFields) {
- // The merger's output field names will be the same, as will the accumulator factories.
- // However, for some accumulators, the expression to be accumulated will be different. The
- // original accumulator may be collecting an expression based on a field expression or
- // constant. Here, we accumulate the output of the same name from the prior group.
- auto copiedAccumulatedField = accumulatedField;
- copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse(
- pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps);
- mergingGroup->addAccumulator(copiedAccumulatedField);
- mergingGroup->_memoryTracker.set(copiedAccumulatedField.fieldName, 0);
- }
-
- // {shardsStage, mergingStage, sortPattern}
- return DistributedPlanLogic{this, mergingGroup, boost::none};
-}
-
-bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const {
- return std::any_of(
- _idExpressions.begin(), _idExpressions.end(), [&dottedPath](const auto& exp) {
- if (auto fieldExp = dynamic_cast<ExpressionFieldPath*>(exp.get())) {
- if (fieldExp->representsPath(dottedPath)) {
- return true;
- }
- }
- return false;
- });
-}
-
-bool DocumentSourceGroup::canRunInParallelBeforeWriteStage(
- const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const {
- if (_doingMerge) {
- return true; // This is fine.
- }
-
- // Certain $group stages are allowed to execute on each exchange consumer. In order to
- // guarantee each consumer will only group together data from its own shard, the $group must
- // group on a superset of the shard key.
- for (auto&& currentPathOfShardKey : nameOfShardKeyFieldsUponEntryToStage) {
- if (!pathIncludedInGroupKeys(currentPathOfShardKey)) {
- // This requires an exact path match, but as a future optimization certain path
- // prefixes should be okay. For example, if the shard key path is "a.b", and we're
- // grouping by "a", then each group of "a" is strictly more specific than "a.b", so
- // we can deduce that grouping by "a" will not need to group together documents
- // across different values of the shard key field "a.b", and thus as long as any
- // other shard key fields are similarly preserved will not need to consume a merged
- // stream to perform the group.
- return false;
- }
- }
- return true;
-}
-
-std::unique_ptr<GroupFromFirstDocumentTransformation>
-DocumentSourceGroup::rewriteGroupAsTransformOnFirstDocument() const {
- if (_idExpressions.size() != 1) {
- // This transformation is only intended for $group stages that group on a single field.
- return nullptr;
- }
-
- auto fieldPathExpr = dynamic_cast<ExpressionFieldPath*>(_idExpressions.front().get());
- if (!fieldPathExpr || fieldPathExpr->isVariableReference()) {
- return nullptr;
- }
-
- const auto fieldPath = fieldPathExpr->getFieldPath();
- if (fieldPath.getPathLength() == 1) {
- // The path is $$CURRENT or $$ROOT. This isn't really a sensible value to group by (since
- // each document has a unique _id, it will just return the entire collection). We only
- // apply the rewrite when grouping by a single field, so we cannot apply it in this case,
- // where we are grouping by the entire document.
- tassert(5943200,
- "Optimization attempted on group by always-dissimilar system variable",
- fieldPath.getFieldName(0) == "CURRENT" || fieldPath.getFieldName(0) == "ROOT");
- return nullptr;
- }
-
- const auto groupId = fieldPath.tail().fullPath();
-
- // We can't do this transformation if there are any non-$first accumulators.
- for (auto&& accumulator : _accumulatedFields) {
- if (AccumulatorDocumentsNeeded::kFirstDocument !=
- accumulator.makeAccumulator()->documentsNeeded()) {
- return nullptr;
- }
- }
-
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fields;
-
- boost::intrusive_ptr<Expression> idField;
- // The _id field can be specified either as a fieldpath (ex. _id: "$a") or as a singleton
- // object (ex. _id: {v: "$a"}).
- if (_idFieldNames.empty()) {
- idField = ExpressionFieldPath::deprecatedCreate(pExpCtx.get(), groupId);
- } else {
- invariant(_idFieldNames.size() == 1);
- idField = ExpressionObject::create(pExpCtx.get(),
- {{_idFieldNames.front(), _idExpressions.front()}});
- }
- fields.push_back(std::make_pair("_id", idField));
-
- for (auto&& accumulator : _accumulatedFields) {
- fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument));
-
- // Since we don't attempt this transformation for non-$first accumulators,
- // the initializer should always be trivial.
- }
-
- return GroupFromFirstDocumentTransformation::create(pExpCtx, groupId, std::move(fields));
-}
-
-size_t DocumentSourceGroup::getMaxMemoryUsageBytes() const {
- return _memoryTracker._maxAllowedMemoryUsageBytes;
-}
-
} // namespace mongo
-
-#include "mongo/db/sorter/sorter.cpp"
-// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 2a3ac2ea89c..a1f8e9b2b9a 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -32,90 +32,19 @@
#include <memory>
#include <utility>
-#include "mongo/db/pipeline/accumulation_statement.h"
-#include "mongo/db/pipeline/accumulator.h"
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/memory_usage_tracker.h"
-#include "mongo/db/pipeline/transformer_interface.h"
-#include "mongo/db/sorter/sorter.h"
+#include "mongo/db/pipeline/document_source_group_base.h"
namespace mongo {
/**
- * GroupFromFirstTransformation consists of a list of (field name, expression pairs). It returns a
- * document synthesized by assigning each field name in the output document to the result of
- * evaluating the corresponding expression. If the expression evaluates to missing, we assign a
- * value of BSONNULL. This is necessary to match the semantics of $first for missing fields.
+ * This class represents hash based group implementation that stores all groups until source is
+ * depleted and only then starts outputing documents.
*/
-class GroupFromFirstDocumentTransformation final : public TransformerInterface {
+class DocumentSourceGroup final : public DocumentSourceGroupBase {
public:
- GroupFromFirstDocumentTransformation(
- const std::string& groupId,
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs)
- : _accumulatorExprs(std::move(accumulatorExprs)), _groupId(groupId) {}
-
- TransformerType getType() const final {
- return TransformerType::kGroupFromFirstDocument;
- }
-
- /**
- * The path of the field that we are grouping on: i.e., the field in the input document that we
- * will use to create the _id field of the ouptut document.
- */
- const std::string& groupId() const {
- return _groupId;
- }
-
- Document applyTransformation(const Document& input) final;
-
- void optimize() final;
-
- Document serializeTransformation(
- boost::optional<ExplainOptions::Verbosity> explain) const final;
-
- DepsTracker::State addDependencies(DepsTracker* deps) const final;
-
- void addVariableRefs(std::set<Variables::Id>* refs) const final;
-
- DocumentSource::GetModPathsReturn getModifiedPaths() const final;
-
- static std::unique_ptr<GroupFromFirstDocumentTransformation> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const std::string& groupId,
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs);
-
-private:
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> _accumulatorExprs;
- std::string _groupId;
-};
-
-class DocumentSourceGroup final : public DocumentSource {
-public:
- using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>;
- using GroupsMap = ValueUnorderedMap<Accumulators>;
-
static constexpr StringData kStageName = "$group"_sd;
- boost::intrusive_ptr<DocumentSource> optimize() final;
- DepsTracker::State getDependencies(DepsTracker* deps) const final;
- void addVariableRefs(std::set<Variables::Id>* refs) const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
const char* getSourceName() const final;
- GetModPathsReturn getModifiedPaths() const final;
- StringMap<boost::intrusive_ptr<Expression>> getIdFields() const;
-
- /**
- * Can be used to change or swap out individual _id fields, but should not be used
- * once execution has begun.
- */
- std::vector<boost::intrusive_ptr<Expression>>& getMutableIdFields();
- const std::vector<AccumulationStatement>& getAccumulatedFields() const;
-
- /**
- * Can be used to change or swap out individual accumulated fields, but should not be used
- * once execution has begun.
- */
- std::vector<AccumulationStatement>& getMutableAccumulatedFields();
/**
* Convenience method for creating a new $group stage. If maxMemoryUsageBytes is boost::none,
@@ -133,199 +62,40 @@ public:
*/
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- StageConstraints constraints(StreamType::kBlocking,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed,
- TransactionRequirement::kAllowed,
- LookupRequirement::kAllowed,
- UnionRequirement::kAllowed);
- constraints.canSwapWithMatch = true;
- return constraints;
- }
-
- /**
- * Add an accumulator, which will become a field in each Document that results from grouping.
- */
- void addAccumulator(AccumulationStatement accumulationStatement);
-
- /**
- * Sets the expression to use to determine the group id of each document.
- */
- void setIdExpression(boost::intrusive_ptr<Expression> idExpression);
-
- /**
- * Returns the expression to use to determine the group id of each document.
- */
- boost::intrusive_ptr<Expression> getIdExpression() const;
-
- /**
- * Returns true if this $group stage represents a 'global' $group which is merging together
- * results from earlier partial groups.
- */
- bool doingMerge() const {
- return _doingMerge;
- }
-
- /**
- * Tell this source if it is doing a merge from shards. Defaults to false.
- */
- void setDoingMerge(bool doingMerge) {
- _doingMerge = doingMerge;
- }
-
- /**
- * Returns true if this $group stage used disk during execution and false otherwise.
- */
- bool usedDisk() final {
- return _stats.spills > 0;
- }
-
- const SpecificStats* getSpecificStats() const final {
- return &_stats;
- }
-
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
- bool canRunInParallelBeforeWriteStage(
- const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const final;
-
- /**
- * When possible, creates a document transformer that transforms the first document in a group
- * into one of the output documents of the $group stage. This is possible when we are grouping
- * on a single field and all accumulators are $first (or there are no accumluators).
- *
- * It is sometimes possible to use a DISTINCT_SCAN to scan the first document of each group,
- * in which case this transformation can replace the actual $group stage in the pipeline
- * (SERVER-9507).
- */
- std::unique_ptr<GroupFromFirstDocumentTransformation> rewriteGroupAsTransformOnFirstDocument()
- const;
-
- /**
- * Returns maximum allowed memory footprint.
- */
- size_t getMaxMemoryUsageBytes() const;
-
- // True if this $group can be pushed down to SBE.
- bool sbeCompatible() const {
- return _sbeCompatible;
- }
+ static boost::intrusive_ptr<DocumentSource> createFromBsonWithMaxMemoryUsage(
+ BSONElement elem,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes);
protected:
GetNextResult doGetNext() final;
- void doDispose() final;
+
+ bool isSpecFieldReserved(StringData) final {
+ return false;
+ }
private:
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<size_t> maxMemoryUsageBytes = boost::none);
/**
- * getNext() dispatches to one of these three depending on what type of $group it is. These
- * methods expect '_currentAccumulators' to have been reset before being called, and also expect
- * initialize() to have been called already.
- */
- GetNextResult getNextSpilled();
- GetNextResult getNextStandard();
-
- /**
- * Before returning anything, this source must prepare itself. In a streaming $group,
- * initialize() requests the first document from the previous source, and uses it to prepare the
- * accumulators. In an unsorted $group, initialize() exhausts the previous source before
- * returning. The '_initialized' boolean indicates that initialize() has finished.
+ * Before returning anything, this source must prepare itself. performBlockingGroup() exhausts
+ * the previous source before
+ * returning. The '_groupsReady' boolean indicates that performBlockingGroup() has finished.
*
* This method may not be able to finish initialization in a single call if 'pSource' returns a
* DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult
* encountered, which may be either kEOF or kPauseExecution.
*/
- GetNextResult initialize();
+ GetNextResult performBlockingGroup();
/**
- * Initializes this $group after any children are potentially initialized see initialize() for
+ * Initializes this $group after any children are initialized. See performBlockingGroup() for
* more details.
*/
- GetNextResult initializeSelf(GetNextResult input);
-
- /**
- * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group
- * does not exhaust the previous stage before returning, and thus does not maintain as large a
- * store of documents at any one time, only an unsorted group can spill to disk.
- */
- std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
-
- /**
- * If we ran out of memory, finish all the pending operations so that some memory
- * can be freed.
- */
- void freeMemory();
-
- Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
-
- /**
- * Computes the internal representation of the group key.
- */
- Value computeId(const Document& root);
-
- /**
- * Converts the internal representation of the group key to the _id shape specified by the
- * user.
- */
- Value expandId(const Value& val);
-
- /**
- * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'.
- */
- bool pathIncludedInGroupKeys(const std::string& dottedPath) const;
-
- /**
- * Cleans up any pending memory usage. Throws error, if memory usage is above
- * 'maxMemoryUsageBytes' and cannot spill to disk.
- *
- * Returns true, if the caller should spill to disk, false otherwise.
- */
- bool shouldSpillWithAttemptToSaveMemory();
-
- std::vector<AccumulationStatement> _accumulatedFields;
-
- bool _doingMerge;
-
- MemoryUsageTracker _memoryTracker;
-
- GroupStats _stats;
-
- std::shared_ptr<Sorter<Value, Value>::File> _file;
-
- // If the expression for the '_id' field represents a non-empty object, we track its fields'
- // names in '_idFieldNames'.
- std::vector<std::string> _idFieldNames;
- // Expressions for the individual fields when '_id' produces a document in the order of
- // '_idFieldNames' or the whole expression otherwise.
- std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
-
- bool _initialized;
-
- Value _currentId;
- Accumulators _currentAccumulators;
-
- // We use boost::optional to defer initialization until the ExpressionContext containing the
- // correct comparator is injected, since the groups must be built using the comparator's
- // definition of equality.
- boost::optional<GroupsMap> _groups;
-
- std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles;
- bool _spilled;
-
- // Only used when '_spilled' is false.
- GroupsMap::iterator groupsIterator;
-
- // Only used when '_spilled' is true.
- std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
-
- std::pair<Value, Value> _firstPartOfNextGroup;
+ GetNextResult performBlockingGroupSelf(GetNextResult input);
- bool _sbeCompatible;
+ bool _groupsReady;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_group_base.cpp b/src/mongo/db/pipeline/document_source_group_base.cpp
new file mode 100644
index 00000000000..aea2b47084a
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_group_base.cpp
@@ -0,0 +1,821 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_group_base.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/expression_dependencies.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/stats/resource_consumption_metrics.h"
+#include "mongo/util/destructor_guard.h"
+
+namespace mongo {
+
+namespace {
+
+/**
+ * Generates a new file name on each call using a static, atomic and monotonically increasing
+ * number.
+ *
+ * Each user of the Sorter must implement this function to ensure that all temporary files that the
+ * Sorter instances produce are uniquely identified using a unique file name extension with separate
+ * atomic variable. This is necessary because the sorter.cpp code is separately included in multiple
+ * places, rather than compiled in one place and linked, and so cannot provide a globally unique ID.
+ */
+std::string nextFileName() {
+ static AtomicWord<unsigned> documentSourceGroupFileCounter;
+ return "extsort-doc-group." + std::to_string(documentSourceGroupFileCounter.fetchAndAdd(1));
+}
+
+} // namespace
+
+using boost::intrusive_ptr;
+using std::pair;
+using std::shared_ptr;
+using std::vector;
+
+Value DocumentSourceGroupBase::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ MutableDocument insides;
+
+ // Add the _id.
+ if (_idFieldNames.empty()) {
+ invariant(_idExpressions.size() == 1);
+ insides["_id"] = _idExpressions[0]->serialize(static_cast<bool>(explain));
+ } else {
+ // Decomposed document case.
+ invariant(_idExpressions.size() == _idFieldNames.size());
+ MutableDocument md;
+ for (size_t i = 0; i < _idExpressions.size(); i++) {
+ md[_idFieldNames[i]] = _idExpressions[i]->serialize(static_cast<bool>(explain));
+ }
+ insides["_id"] = md.freezeToValue();
+ }
+
+ // Add the remaining fields.
+ for (auto&& accumulatedField : _accumulatedFields) {
+ intrusive_ptr<AccumulatorState> accum = accumulatedField.makeAccumulator();
+ insides[accumulatedField.fieldName] =
+ Value(accum->serialize(accumulatedField.expr.initializer,
+ accumulatedField.expr.argument,
+ static_cast<bool>(explain)));
+ }
+
+ if (_doingMerge) {
+ insides["$doingMerge"] = Value(true);
+ }
+
+ serializeAdditionalFields(insides, explain);
+
+ MutableDocument out;
+ out[getSourceName()] = insides.freezeToValue();
+
+ if (explain && *explain >= ExplainOptions::Verbosity::kExecStats) {
+ MutableDocument md;
+
+ for (size_t i = 0; i < _accumulatedFields.size(); i++) {
+ md[_accumulatedFields[i].fieldName] = Value(static_cast<long long>(
+ _memoryTracker[_accumulatedFields[i].fieldName].maxMemoryBytes()));
+ }
+
+ out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue());
+ out["totalOutputDataSizeBytes"] =
+ Value(static_cast<long long>(_stats.totalOutputDataSizeBytes));
+ out["usedDisk"] = Value(_stats.spills > 0);
+ out["spills"] = Value(static_cast<long long>(_stats.spills));
+ }
+
+ return out.freezeToValue();
+}
+
+
+bool DocumentSourceGroupBase::shouldSpillWithAttemptToSaveMemory() {
+ if (!_memoryTracker._allowDiskUse &&
+ (_memoryTracker.currentMemoryBytes() >
+ static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes))) {
+ freeMemory();
+ }
+
+ if (_memoryTracker.currentMemoryBytes() >
+ static_cast<long long>(_memoryTracker._maxAllowedMemoryUsageBytes)) {
+ uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
+ "Exceeded memory limit for $group, but didn't allow external sort."
+ " Pass allowDiskUse:true to opt in.",
+ _memoryTracker._allowDiskUse);
+ _memoryTracker.resetCurrent();
+ return true;
+ }
+ return false;
+}
+
+void DocumentSourceGroupBase::freeMemory() {
+ invariant(_groups);
+ for (auto&& group : *_groups) {
+ for (size_t i = 0; i < group.second.size(); i++) {
+ // Subtract the current usage.
+ _memoryTracker.update(_accumulatedFields[i].fieldName,
+ -1 * group.second[i]->getMemUsage());
+
+ group.second[i]->reduceMemoryConsumptionIfAble();
+
+ // Update the memory usage for this AccumulationStatement.
+ _memoryTracker.update(_accumulatedFields[i].fieldName, group.second[i]->getMemUsage());
+ }
+ }
+}
+
+DocumentSource::GetNextResult DocumentSourceGroupBase::getNextReadyGroup() {
+ if (_spilled) {
+ return getNextSpilled();
+ } else {
+ return getNextStandard();
+ }
+}
+
+DocumentSource::GetNextResult DocumentSourceGroupBase::getNextSpilled() {
+ // We aren't streaming, and we have spilled to disk.
+ if (!_sorterIterator)
+ return GetNextResult::makeEOF();
+
+ Value currentId = _firstPartOfNextGroup.first;
+ const size_t numAccumulators = _accumulatedFields.size();
+
+ // Call startNewGroup on every accumulator.
+ Value expandedId = expandId(currentId);
+ Document idDoc =
+ expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
+ for (size_t i = 0; i < numAccumulators; ++i) {
+ Value initializerValue =
+ _accumulatedFields[i].expr.initializer->evaluate(idDoc, &pExpCtx->variables);
+ _currentAccumulators[i]->reset();
+ _currentAccumulators[i]->startNewGroup(initializerValue);
+ }
+
+ while (pExpCtx->getValueComparator().evaluate(currentId == _firstPartOfNextGroup.first)) {
+ // Inside of this loop, _firstPartOfNextGroup is the current data being processed.
+ // At loop exit, it is the first value to be processed in the next group.
+ switch (numAccumulators) { // mirrors switch in spill()
+ case 1: // Single accumulators serialize as a single Value.
+ _currentAccumulators[0]->process(_firstPartOfNextGroup.second, true);
+ [[fallthrough]];
+ case 0: // No accumulators so no Values.
+ break;
+ default: { // Multiple accumulators serialize as an array of Values.
+ const vector<Value>& accumulatorStates = _firstPartOfNextGroup.second.getArray();
+ for (size_t i = 0; i < numAccumulators; i++) {
+ _currentAccumulators[i]->process(accumulatorStates[i], true);
+ }
+ }
+ }
+
+ if (!_sorterIterator->more()) {
+ _sorterIterator.reset();
+ break;
+ }
+
+ _firstPartOfNextGroup = _sorterIterator->next();
+ }
+
+ return makeDocument(currentId, _currentAccumulators, pExpCtx->needsMerge);
+}
+
+DocumentSource::GetNextResult DocumentSourceGroupBase::getNextStandard() {
+ // Not spilled, and not streaming.
+ if (_groupsIterator == _groups->end())
+ return GetNextResult::makeEOF();
+
+ Document out =
+ makeDocument(_groupsIterator->first, _groupsIterator->second, pExpCtx->needsMerge);
+ ++_groupsIterator;
+ return out;
+}
+
+void DocumentSourceGroupBase::doDispose() {
+ resetReadyGroups();
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceGroupBase::optimize() {
+ // Optimizing a 'DocumentSourceGroupBase' might modify its expressions to become incompatible
+ // with SBE. We temporarily highjack the context's 'sbeCompatible' flag to communicate the
+ // situation back to the 'DocumentSourceGroupBase'. Notice, that while a particular
+ // 'DocumentSourceGroupBase' might become incompatible with SBE, other groups in the pipeline
+ // and the collection access could be still eligible for lowering to SBE, thus we must reset the
+ // context's 'sbeCompatible' flag back to its original value at the end of the 'optimize()'
+ // call.
+ //
+ // TODO SERVER-XXXXX: replace this hack with a proper per-stage tracking of SBE compatibility.
+ auto expCtx = _idExpressions[0]->getExpressionContext();
+ auto orgSbeCompatible = expCtx->sbeCompatible;
+ expCtx->sbeCompatible = true;
+
+ // TODO: If all _idExpressions are ExpressionConstants after optimization, then we know there
+ // will be only one group. We should take advantage of that to avoid going through the hash
+ // table.
+ for (size_t i = 0; i < _idExpressions.size(); i++) {
+ _idExpressions[i] = _idExpressions[i]->optimize();
+ }
+
+ for (auto&& accumulatedField : _accumulatedFields) {
+ accumulatedField.expr.initializer = accumulatedField.expr.initializer->optimize();
+ accumulatedField.expr.argument = accumulatedField.expr.argument->optimize();
+ }
+
+ _sbeCompatible = _sbeCompatible && expCtx->sbeCompatible;
+ expCtx->sbeCompatible = orgSbeCompatible;
+
+ return this;
+}
+
+DepsTracker::State DocumentSourceGroupBase::getDependencies(DepsTracker* deps) const {
+ // add the _id
+ for (size_t i = 0; i < _idExpressions.size(); i++) {
+ expression::addDependencies(_idExpressions[i].get(), deps);
+ }
+
+ // add the rest
+ for (auto&& accumulatedField : _accumulatedFields) {
+ expression::addDependencies(accumulatedField.expr.argument.get(), deps);
+ // Don't add initializer, because it doesn't refer to docs from the input stream.
+ }
+
+ return DepsTracker::State::EXHAUSTIVE_ALL;
+}
+
+void DocumentSourceGroupBase::addVariableRefs(std::set<Variables::Id>* refs) const {
+ for (const auto& idExpr : _idExpressions) {
+ expression::addVariableRefs(idExpr.get(), refs);
+ }
+
+ for (auto&& accumulatedField : _accumulatedFields) {
+ expression::addVariableRefs(accumulatedField.expr.argument.get(), refs);
+ }
+}
+
+DocumentSource::GetModPathsReturn DocumentSourceGroupBase::getModifiedPaths() const {
+ // We preserve none of the fields, but any fields referenced as part of the group key are
+ // logically just renamed.
+ StringMap<std::string> renames;
+ for (std::size_t i = 0; i < _idExpressions.size(); ++i) {
+ auto idExp = _idExpressions[i];
+ auto pathToPutResultOfExpression =
+ _idFieldNames.empty() ? "_id" : "_id." + _idFieldNames[i];
+ auto computedPaths = idExp->getComputedPaths(pathToPutResultOfExpression);
+ for (auto&& rename : computedPaths.renames) {
+ renames[rename.first] = rename.second;
+ }
+ }
+
+ return {DocumentSource::GetModPathsReturn::Type::kAllExcept,
+ OrderedPathSet{}, // No fields are preserved.
+ std::move(renames)};
+}
+
+StringMap<boost::intrusive_ptr<Expression>> DocumentSourceGroupBase::getIdFields() const {
+ if (_idFieldNames.empty()) {
+ invariant(_idExpressions.size() == 1);
+ return {{"_id", _idExpressions[0]}};
+ } else {
+ invariant(_idFieldNames.size() == _idExpressions.size());
+ StringMap<boost::intrusive_ptr<Expression>> result;
+ for (std::size_t i = 0; i < _idFieldNames.size(); ++i) {
+ result["_id." + _idFieldNames[i]] = _idExpressions[i];
+ }
+ return result;
+ }
+}
+
+std::vector<boost::intrusive_ptr<Expression>>& DocumentSourceGroupBase::getMutableIdFields() {
+ tassert(7020503, "Can't mutate _id fields after initialization", !_executionStarted);
+ return _idExpressions;
+}
+
+const std::vector<AccumulationStatement>& DocumentSourceGroupBase::getAccumulatedFields() const {
+ return _accumulatedFields;
+}
+
+std::vector<AccumulationStatement>& DocumentSourceGroupBase::getMutableAccumulatedFields() {
+ tassert(7020504, "Can't mutate accumulated fields after initialization", !_executionStarted);
+ return _accumulatedFields;
+}
+
+DocumentSourceGroupBase::DocumentSourceGroupBase(StringData stageName,
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes)
+ : DocumentSource(stageName, expCtx),
+ _doingMerge(false),
+ _memoryTracker{expCtx->allowDiskUse && !expCtx->inMongos,
+ maxMemoryUsageBytes
+ ? *maxMemoryUsageBytes
+ : static_cast<size_t>(internalDocumentSourceGroupMaxMemoryBytes.load())},
+ _executionStarted(false),
+ _groups(expCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
+ _spilled(false),
+ _sbeCompatible(false) {}
+
+void DocumentSourceGroupBase::addAccumulator(AccumulationStatement accumulationStatement) {
+ _accumulatedFields.push_back(accumulationStatement);
+ _memoryTracker.set(accumulationStatement.fieldName, 0);
+}
+
+namespace {
+
+intrusive_ptr<Expression> parseIdExpression(const intrusive_ptr<ExpressionContext>& expCtx,
+ BSONElement groupField,
+ const VariablesParseState& vps) {
+ if (groupField.type() == Object) {
+ // {_id: {}} is treated as grouping on a constant, not an expression
+ if (groupField.Obj().isEmpty()) {
+ return ExpressionConstant::create(expCtx.get(), Value(groupField));
+ }
+
+ const BSONObj idKeyObj = groupField.Obj();
+ if (idKeyObj.firstElementFieldName()[0] == '$') {
+ // grouping on a $op expression
+ return Expression::parseObject(expCtx.get(), idKeyObj, vps);
+ } else {
+ for (auto&& field : idKeyObj) {
+ uassert(17390,
+ "$group does not support inclusion-style expressions",
+ !field.isNumber() && field.type() != Bool);
+ }
+ return ExpressionObject::parse(expCtx.get(), idKeyObj, vps);
+ }
+ } else {
+ return Expression::parseOperand(expCtx.get(), groupField, vps);
+ }
+}
+
+} // namespace
+
+void DocumentSourceGroupBase::setIdExpression(const boost::intrusive_ptr<Expression> idExpression) {
+ if (auto object = dynamic_cast<ExpressionObject*>(idExpression.get())) {
+ auto& childExpressions = object->getChildExpressions();
+ invariant(!childExpressions.empty()); // We expect to have converted an empty object into a
+ // constant expression.
+
+ // grouping on an "artificial" object. Rather than create the object for each input
+ // in initialize(), instead group on the output of the raw expressions. The artificial
+ // object will be created at the end in makeDocument() while outputting results.
+ for (auto&& childExpPair : childExpressions) {
+ _idFieldNames.push_back(childExpPair.first);
+ _idExpressions.push_back(childExpPair.second);
+ }
+ } else {
+ _idExpressions.push_back(idExpression);
+ }
+}
+
+boost::intrusive_ptr<Expression> DocumentSourceGroupBase::getIdExpression() const {
+ // _idFieldNames is empty and _idExpressions has one element when the _id expression is not an
+ // object expression.
+ if (_idFieldNames.empty() && _idExpressions.size() == 1) {
+ return _idExpressions[0];
+ }
+
+ tassert(6586300,
+ "Field and its expression must be always paired in ExpressionObject",
+ _idFieldNames.size() > 0 && _idFieldNames.size() == _idExpressions.size());
+
+ // Each expression in '_idExpressions' may have been optimized and so, compose the object _id
+ // expression out of the optimized expressions.
+ std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fieldsAndExprs;
+ for (size_t i = 0; i < _idExpressions.size(); ++i) {
+ fieldsAndExprs.emplace_back(_idFieldNames[i], _idExpressions[i]);
+ }
+
+ return ExpressionObject::create(_idExpressions[0]->getExpressionContext(),
+ std::move(fieldsAndExprs));
+}
+
+void DocumentSourceGroupBase::initializeFromBson(BSONElement elem) {
+ uassert(15947, "a group's fields must be specified in an object", elem.type() == Object);
+
+ BSONObj groupObj(elem.Obj());
+ BSONObjIterator groupIterator(groupObj);
+ VariablesParseState vps = pExpCtx->variablesParseState;
+ pExpCtx->sbeGroupCompatible = true;
+ while (groupIterator.more()) {
+ BSONElement groupField(groupIterator.next());
+ StringData pFieldName = groupField.fieldNameStringData();
+ if (pFieldName == "_id") {
+ uassert(15948, "a group's _id may only be specified once", _idExpressions.empty());
+ setIdExpression(parseIdExpression(pExpCtx, groupField, vps));
+ invariant(!_idExpressions.empty());
+ } else if (pFieldName == "$doingMerge") {
+ massert(17030, "$doingMerge should be true if present", groupField.Bool());
+
+ setDoingMerge(true);
+ } else if (isSpecFieldReserved(pFieldName)) {
+ // No-op: field is used by the derived class.
+ } else {
+ // Any other field will be treated as an accumulator specification.
+ addAccumulator(
+ AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), groupField, vps));
+ }
+ }
+ _sbeCompatible = pExpCtx->sbeGroupCompatible && pExpCtx->sbeCompatible;
+
+ uassert(15955, "a group specification must include an _id", !_idExpressions.empty());
+}
+
+namespace {
+
+using GroupsMap = DocumentSourceGroupBase::GroupsMap;
+
+class SorterComparator {
+public:
+ SorterComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {}
+
+ int operator()(const Value& lhs, const Value& rhs) const {
+ return _valueComparator.compare(lhs, rhs);
+ }
+
+private:
+ ValueComparator _valueComparator;
+};
+
+class SpillSTLComparator {
+public:
+ SpillSTLComparator(ValueComparator valueComparator) : _valueComparator(valueComparator) {}
+
+ bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const {
+ return _valueComparator.evaluate(lhs->first < rhs->first);
+ }
+
+private:
+ ValueComparator _valueComparator;
+};
+} // namespace
+
+void DocumentSourceGroupBase::processDocument(const Value& id, const Document& root) {
+ const size_t numAccumulators = _accumulatedFields.size();
+
+ // Look for the _id value in the map. If it's not there, add a new entry with a blank
+ // accumulator. This is done in a somewhat odd way in order to avoid hashing 'id' and
+ // looking it up in '_groups' multiple times.
+ const size_t oldSize = _groups->size();
+ vector<intrusive_ptr<AccumulatorState>>& group = (*_groups)[id];
+ const bool inserted = _groups->size() != oldSize;
+
+ vector<uint64_t> oldAccumMemUsage(numAccumulators, 0);
+ if (inserted) {
+ _memoryTracker.set(_memoryTracker.currentMemoryBytes() + id.getApproximateSize());
+
+ // Initialize and add the accumulators
+ Value expandedId = expandId(id);
+ Document idDoc =
+ expandedId.getType() == BSONType::Object ? expandedId.getDocument() : Document();
+ group.reserve(numAccumulators);
+ for (auto&& accumulatedField : _accumulatedFields) {
+ auto accum = accumulatedField.makeAccumulator();
+ Value initializerValue =
+ accumulatedField.expr.initializer->evaluate(idDoc, &pExpCtx->variables);
+ accum->startNewGroup(initializerValue);
+ group.push_back(accum);
+ }
+ }
+
+ /* tickle all the accumulators for the group we found */
+ dassert(numAccumulators == group.size());
+
+ for (size_t i = 0; i < numAccumulators; i++) {
+ // Only process the input and update the memory footprint if the current accumulator
+ // needs more input.
+ if (group[i]->needsInput()) {
+ const auto prevMemUsage = inserted ? 0 : group[i]->getMemUsage();
+ group[i]->process(
+ _accumulatedFields[i].expr.argument->evaluate(root, &pExpCtx->variables),
+ _doingMerge);
+ _memoryTracker.update(_accumulatedFields[i].fieldName,
+ group[i]->getMemUsage() - prevMemUsage);
+ }
+ }
+
+ if (kDebugBuild && !pExpCtx->opCtx->readOnly()) {
+ // In debug mode, spill every time we have a duplicate id to stress merge logic.
+ if (!inserted && // is a dup
+ !pExpCtx->inMongos && // can't spill to disk in mongos
+ !_memoryTracker._allowDiskUse && // don't change behavior when testing external sort
+ _sortedFiles.size() < 20) { // don't open too many FDs
+ spill();
+ }
+ }
+}
+
+void DocumentSourceGroupBase::readyGroups() {
+ _spilled = !_sortedFiles.empty();
+ if (_spilled) {
+ if (!_groups->empty()) {
+ spill();
+ }
+
+ _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
+
+ _sorterIterator.reset(Sorter<Value, Value>::Iterator::merge(
+ _sortedFiles, SortOptions(), SorterComparator(pExpCtx->getValueComparator())));
+
+ // prepare current to accumulate data
+ _currentAccumulators.reserve(_accumulatedFields.size());
+ for (auto&& accumulatedField : _accumulatedFields) {
+ _currentAccumulators.push_back(accumulatedField.makeAccumulator());
+ }
+
+ verify(_sorterIterator->more()); // we put data in, we should get something out.
+ _firstPartOfNextGroup = _sorterIterator->next();
+ } else {
+ // start the group iterator
+ _groupsIterator = _groups->begin();
+ }
+}
+
+void DocumentSourceGroupBase::resetReadyGroups() {
+ // Free our resources.
+ _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
+ _memoryTracker.resetCurrent();
+ _sorterIterator.reset();
+ _sortedFiles.clear();
+
+ // Make us look done.
+ _groupsIterator = _groups->end();
+}
+
+void DocumentSourceGroupBase::spill() {
+ _stats.spills++;
+
+ vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting
+ ptrs.reserve(_groups->size());
+ for (GroupsMap::const_iterator it = _groups->begin(), end = _groups->end(); it != end; ++it) {
+ ptrs.push_back(&*it);
+ }
+
+ stable_sort(ptrs.begin(), ptrs.end(), SpillSTLComparator(pExpCtx->getValueComparator()));
+
+ // Initialize '_file' in a lazy manner only when it is needed.
+ if (!_file) {
+ _file =
+ std::make_shared<Sorter<Value, Value>::File>(pExpCtx->tempDir + "/" + nextFileName());
+ }
+ SortedFileWriter<Value, Value> writer(SortOptions().TempDir(pExpCtx->tempDir), _file);
+ switch (_accumulatedFields.size()) { // same as ptrs[i]->second.size() for all i.
+ case 0: // no values, essentially a distinct
+ for (size_t i = 0; i < ptrs.size(); i++) {
+ writer.addAlreadySorted(ptrs[i]->first, Value());
+ }
+ break;
+
+ case 1: // just one value, use optimized serialization as single Value
+ for (size_t i = 0; i < ptrs.size(); i++) {
+ writer.addAlreadySorted(ptrs[i]->first,
+ ptrs[i]->second[0]->getValue(/*toBeMerged=*/true));
+ }
+ break;
+
+ default: // multiple values, serialize as array-typed Value
+ for (size_t i = 0; i < ptrs.size(); i++) {
+ vector<Value> accums;
+ for (size_t j = 0; j < ptrs[i]->second.size(); j++) {
+ accums.push_back(ptrs[i]->second[j]->getValue(/*toBeMerged=*/true));
+ }
+ writer.addAlreadySorted(ptrs[i]->first, Value(std::move(accums)));
+ }
+ break;
+ }
+
+ auto& metricsCollector = ResourceConsumption::MetricsCollector::get(pExpCtx->opCtx);
+ metricsCollector.incrementKeysSorted(ptrs.size());
+ metricsCollector.incrementSorterSpills(1);
+
+ _groups->clear();
+ // Zero out the current per-accumulation statement memory consumption, as the memory has been
+ // freed by spilling.
+ for (const auto& accum : _accumulatedFields) {
+ _memoryTracker.set(accum.fieldName, 0);
+ }
+
+ _sortedFiles.emplace_back(writer.done());
+}
+
+Value DocumentSourceGroupBase::computeId(const Document& root) {
+ // If only one expression, return result directly
+ if (_idExpressions.size() == 1) {
+ Value retValue = _idExpressions[0]->evaluate(root, &pExpCtx->variables);
+ return retValue.missing() ? Value(BSONNULL) : std::move(retValue);
+ }
+
+ // Multiple expressions get results wrapped in a vector
+ vector<Value> vals;
+ vals.reserve(_idExpressions.size());
+ for (size_t i = 0; i < _idExpressions.size(); i++) {
+ vals.push_back(_idExpressions[i]->evaluate(root, &pExpCtx->variables));
+ }
+ return Value(std::move(vals));
+}
+
+Value DocumentSourceGroupBase::expandId(const Value& val) {
+ // _id doesn't get wrapped in a document
+ if (_idFieldNames.empty())
+ return val;
+
+ // _id is a single-field document containing val
+ if (_idFieldNames.size() == 1)
+ return Value(DOC(_idFieldNames[0] << val));
+
+ // _id is a multi-field document containing the elements of val
+ const vector<Value>& vals = val.getArray();
+ invariant(_idFieldNames.size() == vals.size());
+ MutableDocument md(vals.size());
+ for (size_t i = 0; i < vals.size(); i++) {
+ md[_idFieldNames[i]] = vals[i];
+ }
+ return md.freezeToValue();
+}
+
+Document DocumentSourceGroupBase::makeDocument(const Value& id,
+ const Accumulators& accums,
+ bool mergeableOutput) {
+ const size_t n = _accumulatedFields.size();
+ MutableDocument out(1 + n);
+
+ /* add the _id field */
+ out.addField("_id", expandId(id));
+
+ /* add the rest of the fields */
+ for (size_t i = 0; i < n; ++i) {
+ Value val = accums[i]->getValue(mergeableOutput);
+ if (val.missing()) {
+ // we return null in this case so return objects are predictable
+ out.addField(_accumulatedFields[i].fieldName, Value(BSONNULL));
+ } else {
+ out.addField(_accumulatedFields[i].fieldName, std::move(val));
+ }
+ }
+
+ _stats.totalOutputDataSizeBytes += out.getApproximateSize();
+ return out.freeze();
+}
+
+bool DocumentSourceGroupBase::pathIncludedInGroupKeys(const std::string& dottedPath) const {
+ return std::any_of(
+ _idExpressions.begin(), _idExpressions.end(), [&dottedPath](const auto& exp) {
+ if (auto fieldExp = dynamic_cast<ExpressionFieldPath*>(exp.get())) {
+ if (fieldExp->representsPath(dottedPath)) {
+ return true;
+ }
+ }
+ return false;
+ });
+}
+
+bool DocumentSourceGroupBase::canRunInParallelBeforeWriteStage(
+ const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const {
+ if (_doingMerge) {
+ return true; // This is fine.
+ }
+
+ // Certain $group stages are allowed to execute on each exchange consumer. In order to
+ // guarantee each consumer will only group together data from its own shard, the $group must
+ // group on a superset of the shard key.
+ for (auto&& currentPathOfShardKey : nameOfShardKeyFieldsUponEntryToStage) {
+ if (!pathIncludedInGroupKeys(currentPathOfShardKey)) {
+ // This requires an exact path match, but as a future optimization certain path
+ // prefixes should be okay. For example, if the shard key path is "a.b", and we're
+ // grouping by "a", then each group of "a" is strictly more specific than "a.b", so
+ // we can deduce that grouping by "a" will not need to group together documents
+ // across different values of the shard key field "a.b", and thus as long as any
+ // other shard key fields are similarly preserved will not need to consume a merged
+ // stream to perform the group.
+ return false;
+ }
+ }
+ return true;
+}
+
+std::unique_ptr<GroupFromFirstDocumentTransformation>
+DocumentSourceGroupBase::rewriteGroupAsTransformOnFirstDocument() const {
+ if (_idExpressions.size() != 1) {
+ // This transformation is only intended for $group stages that group on a single field.
+ return nullptr;
+ }
+
+ auto fieldPathExpr = dynamic_cast<ExpressionFieldPath*>(_idExpressions.front().get());
+ if (!fieldPathExpr || fieldPathExpr->isVariableReference()) {
+ return nullptr;
+ }
+
+ const auto fieldPath = fieldPathExpr->getFieldPath();
+ if (fieldPath.getPathLength() == 1) {
+ // The path is $$CURRENT or $$ROOT. This isn't really a sensible value to group by (since
+ // each document has a unique _id, it will just return the entire collection). We only
+ // apply the rewrite when grouping by a single field, so we cannot apply it in this case,
+ // where we are grouping by the entire document.
+ tassert(5943200,
+ "Optimization attempted on group by always-dissimilar system variable",
+ fieldPath.getFieldName(0) == "CURRENT" || fieldPath.getFieldName(0) == "ROOT");
+ return nullptr;
+ }
+
+ const auto groupId = fieldPath.tail().fullPath();
+
+ // We can't do this transformation if there are any non-$first accumulators.
+ for (auto&& accumulator : _accumulatedFields) {
+ if (AccumulatorDocumentsNeeded::kFirstDocument !=
+ accumulator.makeAccumulator()->documentsNeeded()) {
+ return nullptr;
+ }
+ }
+
+ std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> fields;
+
+ boost::intrusive_ptr<Expression> idField;
+ // The _id field can be specified either as a fieldpath (ex. _id: "$a") or as a singleton
+ // object (ex. _id: {v: "$a"}).
+ if (_idFieldNames.empty()) {
+ idField = ExpressionFieldPath::deprecatedCreate(pExpCtx.get(), groupId);
+ } else {
+ invariant(_idFieldNames.size() == 1);
+ idField = ExpressionObject::create(pExpCtx.get(),
+ {{_idFieldNames.front(), _idExpressions.front()}});
+ }
+ fields.push_back(std::make_pair("_id", idField));
+
+ for (auto&& accumulator : _accumulatedFields) {
+ fields.push_back(std::make_pair(accumulator.fieldName, accumulator.expr.argument));
+
+ // Since we don't attempt this transformation for non-$first accumulators,
+ // the initializer should always be trivial.
+ }
+
+ return GroupFromFirstDocumentTransformation::create(
+ pExpCtx, groupId, getSourceName(), std::move(fields));
+}
+
+size_t DocumentSourceGroupBase::getMaxMemoryUsageBytes() const {
+ return _memoryTracker._maxAllowedMemoryUsageBytes;
+}
+
+boost::optional<DocumentSource::DistributedPlanLogic>
+DocumentSourceGroupBase::distributedPlanLogic() {
+ VariablesParseState vps = pExpCtx->variablesParseState;
+ /* the merger will use the same grouping key */
+ auto mergerGroupByExpression = ExpressionFieldPath::parse(pExpCtx.get(), "$$ROOT._id", vps);
+
+ std::vector<AccumulationStatement> mergerAccumulators;
+ mergerAccumulators.reserve(_accumulatedFields.size());
+ for (auto&& accumulatedField : _accumulatedFields) {
+ // The merger's output field names will be the same, as will the accumulator factories.
+ // However, for some accumulators, the expression to be accumulated will be different. The
+ // original accumulator may be collecting an expression based on a field expression or
+ // constant. Here, we accumulate the output of the same name from the prior group.
+ auto copiedAccumulatedField = accumulatedField;
+ copiedAccumulatedField.expr.argument = ExpressionFieldPath::parse(
+ pExpCtx.get(), "$$ROOT." + copiedAccumulatedField.fieldName, vps);
+ mergerAccumulators.emplace_back(std::move(copiedAccumulatedField));
+ }
+
+ // When merging, we always use generic hash based algorithm.
+ boost::intrusive_ptr<DocumentSourceGroup> mergingGroup = DocumentSourceGroup::create(
+ pExpCtx, std::move(mergerGroupByExpression), std::move(mergerAccumulators));
+ mergingGroup->setDoingMerge(true);
+ // {shardsStage, mergingStage, sortPattern}
+ return DistributedPlanLogic{this, mergingGroup, boost::none};
+}
+
+} // namespace mongo
+
+#include "mongo/db/sorter/sorter.cpp"
+// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file.
diff --git a/src/mongo/db/pipeline/document_source_group_base.h b/src/mongo/db/pipeline/document_source_group_base.h
new file mode 100644
index 00000000000..9cbd71e9637
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_group_base.h
@@ -0,0 +1,267 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/group_from_first_document_transformation.h"
+#include "mongo/db/pipeline/memory_usage_tracker.h"
+#include "mongo/db/sorter/sorter.h"
+
+namespace mongo {
+
+/**
+ * This class represents a $group stage generically - could be a streaming or hash based group.
+ *
+ * It contains some common execution code between the two algorithms, such as:
+ * - Handling spilling to disk.
+ * - Computing the group key
+ * - Accumulating values and populating output documents.
+ */
+class DocumentSourceGroupBase : public DocumentSource {
+public:
+ using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>;
+ using GroupsMap = ValueUnorderedMap<Accumulators>;
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+ DepsTracker::State getDependencies(DepsTracker* deps) const final;
+ void addVariableRefs(std::set<Variables::Id>* refs) const final;
+ GetModPathsReturn getModifiedPaths() const final;
+ StringMap<boost::intrusive_ptr<Expression>> getIdFields() const;
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
+
+ /**
+ * Can be used to change or swap out individual _id fields, but should not be used
+ * once execution has begun.
+ */
+ std::vector<boost::intrusive_ptr<Expression>>& getMutableIdFields();
+ const std::vector<AccumulationStatement>& getAccumulatedFields() const;
+
+ /**
+ * Can be used to change or swap out individual accumulated fields, but should not be used
+ * once execution has begun.
+ */
+ std::vector<AccumulationStatement>& getMutableAccumulatedFields();
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ StageConstraints constraints(StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
+ LookupRequirement::kAllowed,
+ UnionRequirement::kAllowed);
+ constraints.canSwapWithMatch = true;
+ return constraints;
+ }
+
+ /**
+ * Add an accumulator, which will become a field in each Document that results from grouping.
+ */
+ void addAccumulator(AccumulationStatement accumulationStatement);
+
+ /**
+ * Sets the expression to use to determine the group id of each document.
+ */
+ void setIdExpression(boost::intrusive_ptr<Expression> idExpression);
+
+ /**
+ * Returns the expression to use to determine the group id of each document.
+ */
+ boost::intrusive_ptr<Expression> getIdExpression() const;
+
+ /**
+ * Returns true if this $group stage represents a 'global' $group which is merging together
+ * results from earlier partial groups.
+ */
+ bool doingMerge() const {
+ return _doingMerge;
+ }
+
+ /**
+ * Tell this source if it is doing a merge from shards. Defaults to false.
+ */
+ void setDoingMerge(bool doingMerge) {
+ _doingMerge = doingMerge;
+ }
+
+ /**
+ * Returns true if this $group stage used disk during execution and false otherwise.
+ */
+ bool usedDisk() final {
+ return _stats.spills > 0;
+ }
+
+ const SpecificStats* getSpecificStats() const final {
+ return &_stats;
+ }
+
+ bool canRunInParallelBeforeWriteStage(
+ const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const final;
+
+ /**
+ * When possible, creates a document transformer that transforms the first document in a group
+ * into one of the output documents of the $group stage. This is possible when we are grouping
+ * on a single field and all accumulators are $first (or there are no accumluators).
+ *
+ * It is sometimes possible to use a DISTINCT_SCAN to scan the first document of each group,
+ * in which case this transformation can replace the actual $group stage in the pipeline
+ * (SERVER-9507).
+ */
+ std::unique_ptr<GroupFromFirstDocumentTransformation> rewriteGroupAsTransformOnFirstDocument()
+ const;
+
+ /**
+ * Returns maximum allowed memory footprint.
+ */
+ size_t getMaxMemoryUsageBytes() const;
+
+ // True if this $group can be pushed down to SBE.
+ bool sbeCompatible() const {
+ return _sbeCompatible;
+ }
+
+protected:
+ DocumentSourceGroupBase(StringData stageName,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes = boost::none);
+
+ void initializeFromBson(BSONElement elem);
+ virtual bool isSpecFieldReserved(StringData fieldName) = 0;
+
+ void doDispose() final;
+
+ /**
+ * Cleans up any pending memory usage. Throws error, if memory usage is above
+ * 'maxMemoryUsageBytes' and cannot spill to disk.
+ *
+ * Returns true, if the caller should spill to disk, false otherwise.
+ */
+ bool shouldSpillWithAttemptToSaveMemory();
+
+ /**
+ * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group
+ * does not exhaust the previous stage before returning, and thus does not maintain as large a
+ * store of documents at any one time, only an unsorted group can spill to disk.
+ */
+ void spill();
+
+ /**
+ * Computes the internal representation of the group key.
+ */
+ Value computeId(const Document& root);
+
+ void processDocument(const Value& id, const Document& root);
+
+ void readyGroups();
+ void resetReadyGroups();
+
+ GetNextResult getNextReadyGroup();
+
+ void setExecutionStarted() {
+ _executionStarted = true;
+ }
+
+ virtual void serializeAdditionalFields(
+ MutableDocument& out, boost::optional<ExplainOptions::Verbosity> explain) const {};
+
+ // If the expression for the '_id' field represents a non-empty object, we track its fields'
+ // names in '_idFieldNames'.
+ std::vector<std::string> _idFieldNames;
+ // Expressions for the individual fields when '_id' produces a document in the order of
+ // '_idFieldNames' or the whole expression otherwise.
+ std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
+
+private:
+ GetNextResult getNextSpilled();
+ GetNextResult getNextStandard();
+
+ /**
+ * If we ran out of memory, finish all the pending operations so that some memory
+ * can be freed.
+ */
+ void freeMemory();
+
+ Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
+
+ /**
+ * Converts the internal representation of the group key to the _id shape specified by the
+ * user.
+ */
+ Value expandId(const Value& val);
+
+ /**
+ * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'.
+ */
+ bool pathIncludedInGroupKeys(const std::string& dottedPath) const;
+
+ std::vector<AccumulationStatement> _accumulatedFields;
+
+ bool _doingMerge;
+
+ MemoryUsageTracker _memoryTracker;
+
+ GroupStats _stats;
+
+ /**
+ * This flag should be set during first execution of getNext() to assert that non-const methods
+ * that expose internal structures are not called during runtime.
+ */
+ bool _executionStarted;
+
+ // We use boost::optional to defer initialization until the ExpressionContext containing the
+ // correct comparator is injected, since the groups must be built using the comparator's
+ // definition of equality.
+ boost::optional<GroupsMap> _groups;
+
+ std::shared_ptr<Sorter<Value, Value>::File> _file;
+ std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles;
+ bool _spilled;
+
+ // Only used when '_spilled' is false.
+ GroupsMap::iterator _groupsIterator;
+
+ // Only used when '_spilled' is true.
+ std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
+
+ std::pair<Value, Value> _firstPartOfNextGroup;
+ Accumulators _currentAccumulators;
+
+ bool _sbeCompatible;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 8867356bc89..ef29cdb2795 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_streaming_group.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/query/query_test_service_context.h"
@@ -251,32 +252,65 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) {
return arr[0].getDocument().toBson();
}
+enum class GroupStageType { Default, Streaming };
+
class Base : public ServiceContextTest {
public:
- Base()
+ Base(GroupStageType groupStageType = GroupStageType::Default)
: _opCtx(makeOperationContext()),
_ctx(new ExpressionContextForTest(_opCtx.get(),
AggregateCommandRequest(NamespaceString(ns), {}))),
- _tempDir("DocumentSourceGroupTest") {}
+ _tempDir("DocumentSourceGroupTest"),
+ _groupStageType(groupStageType) {}
protected:
+ StringData getStageName() const {
+ switch (_groupStageType) {
+ case GroupStageType::Default:
+ return DocumentSourceGroup::kStageName;
+ case GroupStageType::Streaming:
+ return DocumentSourceStreamingGroup::kStageName;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+
+ virtual boost::optional<size_t> getMaxMemoryUsageBytes() {
+ return boost::none;
+ }
+
+ intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement specElement, intrusive_ptr<ExpressionContext> expressionContext) {
+ switch (_groupStageType) {
+ case GroupStageType::Default:
+ return DocumentSourceGroup::createFromBsonWithMaxMemoryUsage(
+ std::move(specElement), expressionContext, getMaxMemoryUsageBytes());
+ case GroupStageType::Streaming:
+ return DocumentSourceStreamingGroup::createFromBsonWithMaxMemoryUsage(
+ std::move(specElement), expressionContext, getMaxMemoryUsageBytes());
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+
void createGroup(const BSONObj& spec, bool inShard = false, bool inMongos = false) {
- BSONObj namedSpec = BSON("$group" << spec);
+ BSONObj namedSpec = BSON(getStageName() << spec);
BSONElement specElement = namedSpec.firstElement();
intrusive_ptr<ExpressionContextForTest> expressionContext = new ExpressionContextForTest(
_opCtx.get(), AggregateCommandRequest(NamespaceString(ns), {}));
+ expressionContext->allowDiskUse = true;
// For $group, 'inShard' implies 'fromMongos' and 'needsMerge'.
expressionContext->fromMongos = expressionContext->needsMerge = inShard;
expressionContext->inMongos = inMongos;
// Won't spill to disk properly if it needs to.
expressionContext->tempDir = _tempDir.path();
- _group = DocumentSourceGroup::createFromBson(specElement, expressionContext);
+ _group = createFromBson(specElement, expressionContext);
assertRoundTrips(_group, expressionContext);
}
- DocumentSourceGroup* group() {
- return static_cast<DocumentSourceGroup*>(_group.get());
+ DocumentSourceGroupBase* group() {
+ return static_cast<DocumentSourceGroupBase*>(_group.get());
}
/** Assert that iterator state accessors consistently report the source is exhausted. */
void assertEOF(const intrusive_ptr<DocumentSource>& source) const {
@@ -298,8 +332,7 @@ private:
// $const operators may be introduced in the first serialization.
BSONObj spec = toBson(group);
BSONElement specElement = spec.firstElement();
- intrusive_ptr<DocumentSource> generated =
- DocumentSourceGroup::createFromBson(specElement, expCtx);
+ intrusive_ptr<DocumentSource> generated = createFromBson(specElement, expCtx);
ASSERT_BSONOBJ_EQ(spec, toBson(generated));
}
std::unique_ptr<QueryTestServiceContext> _queryServiceContext;
@@ -307,6 +340,7 @@ private:
intrusive_ptr<ExpressionContextForTest> _ctx;
intrusive_ptr<DocumentSource> _group;
TempDir _tempDir;
+ GroupStageType _groupStageType;
};
class ParseErrorBase : public Base {
@@ -354,10 +388,9 @@ class IdConstantBase : public ExpressionBase {
class NonObject : public Base {
public:
void _doTest() final {
- BSONObj spec = BSON("$group"
- << "foo");
+ BSONObj spec = BSON(getStageName() << "foo");
BSONElement specElement = spec.firstElement();
- ASSERT_THROWS(DocumentSourceGroup::createFromBson(specElement, ctx()), AssertionException);
+ ASSERT_THROWS(createFromBson(specElement, ctx()), AssertionException);
}
};
@@ -556,8 +589,10 @@ typedef map<Value, Document, ValueCmp> IdMap;
class CheckResultsBase : public Base {
public:
+ CheckResultsBase(GroupStageType groupStageType = GroupStageType::Default)
+ : Base(groupStageType) {}
virtual ~CheckResultsBase() {}
- void _doTest() {
+ void _doTest() override {
runSharded(false);
runSharded(true);
}
@@ -570,7 +605,7 @@ public:
if (sharded) {
sink = createMerger();
// Serialize and re-parse the shard stage.
- createGroup(toBson(group())["$group"].Obj(), true);
+ createGroup(toBson(group())[group()->getSourceName()].Obj(), true);
group()->setSource(source.get());
sink->setSource(group());
}
@@ -870,6 +905,298 @@ public:
}
};
+class StreamingSimple final : public CheckResultsBase {
+public:
+ StreamingSimple() : CheckResultsBase(GroupStageType::Streaming) {}
+
+private:
+ deque<DocumentSource::GetNextResult> inputData() final {
+ return {Document(BSON("a" << 1 << "b" << 1)),
+ Document(BSON("a" << 1 << "b" << 2)),
+ Document(BSON("a" << 2 << "b" << 3)),
+ Document(BSON("a" << 2 << "b" << 1))};
+ }
+ BSONObj groupSpec() final {
+ return BSON("_id"
+ << "$a"
+ << "sum"
+ << BSON("$sum"
+ << "$b")
+ << "$monotonicIdFields" << BSON_ARRAY("_id"));
+ }
+ string expectedResultSetString() final {
+ return "[{_id:1,sum:3},{_id:2,sum:4}]";
+ }
+};
+
+constexpr size_t kBigStringSize = 1024;
+const std::string kBigString(kBigStringSize, 'a');
+
+class CheckResultsAndSpills : public CheckResultsBase {
+public:
+ CheckResultsAndSpills(GroupStageType groupStageType, uint64_t expectedSpills)
+ : CheckResultsBase(groupStageType), _expectedSpills(expectedSpills) {}
+
+ void _doTest() final {
+ for (int sharded = 0; sharded < 2; ++sharded) {
+ runSharded(sharded);
+ const auto* groupStats = static_cast<const GroupStats*>(group()->getSpecificStats());
+ ASSERT_EQ(groupStats->spills, _expectedSpills);
+ }
+ }
+
+private:
+ uint64_t _expectedSpills;
+};
+
+template <GroupStageType groupStageType, uint64_t expectedSpills>
+class StreamingSpillTest : public CheckResultsAndSpills {
+public:
+ StreamingSpillTest() : CheckResultsAndSpills(groupStageType, expectedSpills) {}
+
+private:
+ static constexpr int kCount = 11;
+
+ deque<DocumentSource::GetNextResult> inputData() final {
+ deque<DocumentSource::GetNextResult> queue;
+ for (int i = 0; i < kCount; ++i) {
+ queue.emplace_back(Document(BSON("a" << i << "b" << kBigString)));
+ }
+ return queue;
+ }
+
+ BSONObj groupSpec() final {
+ BSONObjBuilder builder;
+ builder.append("_id", "$a");
+ BSONObjBuilder accum(builder.subobjStart("big_array"));
+ accum.append("$push", "$b");
+ accum.done();
+
+ if constexpr (groupStageType == GroupStageType::Streaming) {
+ BSONArrayBuilder sub(builder.subarrayStart("$monotonicIdFields"));
+ sub.append("_id").done();
+ }
+ return builder.done();
+ }
+
+ boost::optional<size_t> getMaxMemoryUsageBytes() final {
+ return 10 * kBigStringSize;
+ }
+
+ BSONObj expectedResultSet() final {
+ BSONArrayBuilder result;
+ for (int i = 0; i < kCount; ++i) {
+ result.append(BSON("_id" << i << "big_array" << BSON_ARRAY(kBigString)));
+ }
+ return result.arr();
+ }
+};
+
+class WithoutStreamingSpills final
+ : public StreamingSpillTest<GroupStageType::Default, 2 /*expectedSpills*/> {};
+class StreamingDoesNotSpill final
+ : public StreamingSpillTest<GroupStageType::Streaming, 0 /*expectedSpills*/> {};
+
+class StreamingCanSpill final : public CheckResultsAndSpills {
+public:
+ StreamingCanSpill() : CheckResultsAndSpills(GroupStageType::Streaming, 2 /*expectedSpills*/) {}
+
+private:
+ static constexpr int kCount = 11;
+
+ deque<DocumentSource::GetNextResult> inputData() final {
+ deque<DocumentSource::GetNextResult> queue;
+ for (int i = 0; i < kCount; ++i) {
+ queue.emplace_back(Document(BSON("x" << 0 << "y" << i << "b" << kBigString)));
+ }
+ return queue;
+ }
+
+ BSONObj groupSpec() final {
+ auto id = BSON("x"
+ << "$x"
+ << "y"
+ << "$y");
+ return BSON("_id" << id << "big_array"
+ << BSON("$push"
+ << "$b")
+ << "$monotonicIdFields" << BSON_ARRAY("x"));
+ }
+
+ boost::optional<size_t> getMaxMemoryUsageBytes() final {
+ return 10 * kBigStringSize;
+ }
+
+ BSONObj expectedResultSet() final {
+ BSONArrayBuilder result;
+ for (int i = 0; i < kCount; ++i) {
+ auto id = BSON("x" << 0 << "y" << i);
+ result.append(BSON("_id" << id << "big_array" << BSON_ARRAY(kBigString)));
+ }
+ return result.done();
+ }
+};
+
+class StreamingAlternatingSpillAndNoSpillBatches : public CheckResultsAndSpills {
+public:
+ StreamingAlternatingSpillAndNoSpillBatches()
+ : CheckResultsAndSpills(GroupStageType::Streaming, 3 /*expectedSpills*/) {}
+
+private:
+ static constexpr int kCount = 12;
+
+ deque<DocumentSource::GetNextResult> inputData() final {
+ deque<DocumentSource::GetNextResult> queue;
+ for (int i = 0; i < kCount; ++i) {
+ // For groups with i % 3 == 0 and i % 3 == 1 there should be no spilling, but groups
+ // with i % 3 == 2 should spill.
+ for (int j = 0; j < (i % 3) + 1; ++j) {
+ queue.emplace_back(Document(BSON("a" << i << "b" << kBigString)));
+ }
+ }
+ return queue;
+ }
+
+ BSONObj groupSpec() final {
+ return BSON("_id"
+ << "$a"
+ << "big_array"
+ << BSON("$push"
+ << "$b")
+ << "$monotonicIdFields" << BSON_ARRAY("_id"));
+ }
+
+ boost::optional<size_t> getMaxMemoryUsageBytes() final {
+ return (25 * kBigStringSize) / 10;
+ }
+
+ BSONObj expectedResultSet() final {
+ BSONArrayBuilder result;
+ for (int i = 0; i < kCount; ++i) {
+ BSONArrayBuilder bigArray;
+ for (int j = 0; j < (i % 3) + 1; ++j) {
+ bigArray.append(kBigString);
+ }
+ result.append(BSON("_id" << i << "big_array" << bigArray.arr()));
+ }
+ return result.done();
+ }
+};
+
+class StreamingComplex final : public CheckResultsBase {
+public:
+ StreamingComplex() : CheckResultsBase(GroupStageType::Streaming) {}
+
+private:
+ static constexpr int kCount = 3;
+
+ deque<DocumentSource::GetNextResult> inputData() final {
+ deque<DocumentSource::GetNextResult> queue;
+ for (int i = 0; i < kCount; ++i) {
+ for (int j = 0; j < kCount; ++j) {
+ for (int k = 0; k < kCount; ++k) {
+ queue.emplace_back(Document(BSON("x" << i << "y" << j << "z" << k)));
+ }
+ }
+ }
+ return queue;
+ }
+
+ BSONObj groupSpec() final {
+ BSONObj id = BSON("x"
+ << "$x"
+ << "y"
+ << "$y");
+ return BSON("_id" << id << "sum"
+ << BSON("$sum"
+ << "$z")
+ << "$monotonicIdFields" << BSON_ARRAY("x"));
+ }
+
+ boost::optional<size_t> getMaxMemoryUsageBytes() final {
+ return 10 * kBigStringSize;
+ }
+
+ BSONObj expectedResultSet() final {
+ BSONArrayBuilder result;
+ for (int i = 0; i < kCount; ++i) {
+ for (int j = 0; j < kCount; ++j) {
+ result.append(BSON("_id" << BSON("x" << i << "y" << j) << "sum"
+ << (kCount * (kCount - 1)) / 2));
+ }
+ }
+ return result.arr();
+ }
+};
+
+class StreamingMultipleMonotonicFields final : public CheckResultsBase {
+public:
+ StreamingMultipleMonotonicFields() : CheckResultsBase(GroupStageType::Streaming) {}
+
+private:
+ static constexpr int kCount = 6;
+ deque<DocumentSource::GetNextResult> inputData() final {
+ deque<DocumentSource::GetNextResult> queue;
+ generateInputOutput([&queue](int x, int y) {
+ for (int i = 0; i < kCount; ++i) {
+ queue.emplace_back(Document(BSON("x" << x << "y" << y << "z" << i)));
+ }
+ });
+ return queue;
+ }
+
+ BSONObj groupSpec() final {
+ BSONObjBuilder builder;
+
+ BSONObjBuilder id(builder.subobjStart("_id"));
+ id.append("x", "$x").append("y", "$y").done();
+
+ BSONObjBuilder accum(builder.subobjStart("sum"));
+ accum.append("$sum", "$z").done();
+
+ BSONArrayBuilder sub(builder.subarrayStart("$monotonicIdFields"));
+ sub.append("x").append("y").done();
+
+ return builder.done();
+ }
+
+ boost::optional<size_t> getMaxMemoryUsageBytes() final {
+ return 10 * kBigStringSize;
+ }
+
+ BSONObj expectedResultSet() final {
+ BSONArrayBuilder result;
+ generateInputOutput([&result](int x, int y) {
+ BSONObjBuilder builder(result.subobjStart());
+
+ BSONObjBuilder id(builder.subobjStart("_id"));
+ id.append("x", x).append("y", y).done();
+
+ builder.append("sum", (kCount * (kCount - 1)) / 2);
+ builder.done();
+ });
+ return result.arr();
+ }
+
+ template <typename Callback>
+ void generateInputOutput(const Callback& callback) {
+ int x = 0;
+ int y = 0;
+ for (int i = 0; i < kCount; ++i) {
+ callback(x, y);
+ int state = i % 3;
+ if (state == 0) {
+ x++;
+ } else if (state == 1) {
+ y++;
+ } else {
+ x++;
+ y++;
+ }
+ }
+ }
+};
+
class All : public OldStyleSuiteSpecification {
public:
All() : OldStyleSuiteSpecification("DocumentSourceGroupTests") {}
@@ -910,6 +1237,14 @@ public:
add<Dependencies>();
add<StringConstantIdAndAccumulatorExpressions>();
add<ArrayConstantAccumulatorExpression>();
+
+ add<StreamingSimple>();
+ add<WithoutStreamingSpills>();
+ add<StreamingDoesNotSpill>();
+ add<StreamingCanSpill>();
+ add<StreamingAlternatingSpillAndNoSpillBatches>();
+ add<StreamingComplex>();
+ add<StreamingMultipleMonotonicFields>();
#if 0
// Disabled tests until SERVER-23318 is implemented.
add<StreamingOptimization>();
diff --git a/src/mongo/db/pipeline/document_source_streaming_group.cpp b/src/mongo/db/pipeline/document_source_streaming_group.cpp
new file mode 100644
index 00000000000..2d055900c90
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_streaming_group.cpp
@@ -0,0 +1,256 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/exec/document_value/value_comparator.h"
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_streaming_group.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/expression_dependencies.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/stats/resource_consumption_metrics.h"
+#include "mongo/util/destructor_guard.h"
+
+namespace mongo {
+
+/*
+ * $_internalStreamingGroup is an internal stage that is only used in certain cases by the
+ * pipeline optimizer. For now it should not be used anywhere outside the MongoDB server.
+ */
+REGISTER_DOCUMENT_SOURCE(_internalStreamingGroup,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceStreamingGroup::createFromBson,
+ AllowedWithApiStrict::kAlways);
+
+constexpr StringData DocumentSourceStreamingGroup::kStageName;
+
+const char* DocumentSourceStreamingGroup::getSourceName() const {
+ return kStageName.rawData();
+}
+
+DocumentSource::GetNextResult DocumentSourceStreamingGroup::doGetNext() {
+ auto getReadyResult = getNextReadyGroup();
+ if (!getReadyResult.isEOF()) {
+ return getReadyResult;
+ } else if (_sourceDepleted) {
+ dispose();
+ return getReadyResult;
+ }
+
+ auto prepareResult = readyNextBatch();
+ if (prepareResult.isPaused()) {
+ return prepareResult;
+ }
+ return getNextReadyGroup();
+}
+
+DocumentSourceStreamingGroup::DocumentSourceStreamingGroup(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes)
+ : DocumentSourceGroupBase(kStageName, expCtx, maxMemoryUsageBytes), _sourceDepleted(false) {}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceStreamingGroup::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return createFromBsonWithMaxMemoryUsage(std::move(elem), expCtx, boost::none);
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceStreamingGroup::createFromBsonWithMaxMemoryUsage(
+ BSONElement elem,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes) {
+ boost::intrusive_ptr<DocumentSourceStreamingGroup> groupStage =
+ new DocumentSourceStreamingGroup(expCtx, maxMemoryUsageBytes);
+ groupStage->initializeFromBson(elem);
+
+ const auto& monotonicIdFieldsElem = elem.Obj().getField(kMonotonicIdFieldsSpecField);
+ uassert(7026702,
+ "streaming group must specify an array of monotonic id fields " +
+ kMonotonicIdFieldsSpecField,
+ monotonicIdFieldsElem.type() == Array);
+ const auto& monotonicIdFields = monotonicIdFieldsElem.Array();
+ if (groupStage->_idFieldNames.empty()) {
+ uassert(7026703,
+ "if there is no explicit id fields, " + kMonotonicIdFieldsSpecField +
+ " must contain a single \"_id\" string",
+ monotonicIdFields.size() == 1 &&
+ monotonicIdFields[0].valueStringDataSafe() == "_id"_sd);
+ groupStage->_monotonicExpressionIndexes.push_back(0);
+ } else {
+ groupStage->_monotonicExpressionIndexes.reserve(monotonicIdFields.size());
+ for (const auto& fieldNameElem : monotonicIdFields) {
+ uassert(7026704,
+ kMonotonicIdFieldsSpecField + " elements must be strings",
+ fieldNameElem.type() == String);
+ StringData fieldName = fieldNameElem.valueStringData();
+ auto it = std::find(
+ groupStage->_idFieldNames.begin(), groupStage->_idFieldNames.end(), fieldName);
+ uassert(7026705, "id field not found", it != groupStage->_idFieldNames.end());
+ groupStage->_monotonicExpressionIndexes.push_back(
+ std::distance(groupStage->_idFieldNames.begin(), it));
+ }
+ std::sort(groupStage->_monotonicExpressionIndexes.begin(),
+ groupStage->_monotonicExpressionIndexes.end());
+ }
+
+ return groupStage;
+}
+
+void DocumentSourceStreamingGroup::serializeAdditionalFields(
+ MutableDocument& out, boost::optional<ExplainOptions::Verbosity> explain) const {
+ std::vector<Value> monotonicIdFields;
+ if (_idFieldNames.empty()) {
+ monotonicIdFields.emplace_back("_id"_sd);
+ } else {
+ for (size_t i : _monotonicExpressionIndexes) {
+ monotonicIdFields.emplace_back(_idFieldNames[i]);
+ }
+ }
+ out[kMonotonicIdFieldsSpecField] = Value(std::move(monotonicIdFields));
+}
+
+bool DocumentSourceStreamingGroup::isSpecFieldReserved(StringData fieldName) {
+ return fieldName == kMonotonicIdFieldsSpecField;
+}
+
+DocumentSource::GetNextResult DocumentSourceStreamingGroup::getNextDocument() {
+ if (_firstDocumentOfNextBatch) {
+ GetNextResult result = std::move(_firstDocumentOfNextBatch.value());
+ _firstDocumentOfNextBatch.reset();
+ return result;
+ }
+ return pSource->getNext();
+}
+
+DocumentSource::GetNextResult DocumentSourceStreamingGroup::readyNextBatch() {
+ resetReadyGroups();
+ GetNextResult input = getNextDocument();
+ return readyNextBatchInner(input);
+}
+
+// This separate NOINLINE function is used here to decrease stack utilization of readyNextBatch()
+// and prevent stack overflows.
+MONGO_COMPILER_NOINLINE DocumentSource::GetNextResult
+DocumentSourceStreamingGroup::readyNextBatchInner(GetNextResult input) {
+ setExecutionStarted();
+ // Calculate groups until we either exaust pSource or encounter change in monotonic id
+ // expression, which means all current groups are finalized.
+ for (; input.isAdvanced(); input = pSource->getNext()) {
+ if (shouldSpillWithAttemptToSaveMemory()) {
+ spill();
+ }
+ auto root = input.releaseDocument();
+ Value id = computeId(root);
+
+ if (isBatchFinished(id)) {
+ _firstDocumentOfNextBatch = std::move(root);
+ readyGroups();
+ return input;
+ }
+
+ processDocument(id, root);
+ }
+
+ switch (input.getStatus()) {
+ case DocumentSource::GetNextResult::ReturnStatus::kAdvanced: {
+ MONGO_UNREACHABLE; // We consumed all advances above.
+ }
+ case DocumentSource::GetNextResult::ReturnStatus::kPauseExecution: {
+ return input; // Propagate pause.
+ }
+ case DocumentSource::GetNextResult::ReturnStatus::kEOF: {
+ readyGroups();
+ _sourceDepleted = true;
+ return input;
+ }
+ }
+ MONGO_UNREACHABLE;
+}
+
+bool DocumentSourceStreamingGroup::isBatchFinished(const Value& id) {
+ if (_idExpressions.size() == 1) {
+ tassert(7026706,
+ "if there are no explicit id fields, it is only one monotonic expression with id 0",
+ _monotonicExpressionIndexes.size() == 1 && _monotonicExpressionIndexes[0] == 0);
+ return checkForBatchEndAndUpdateLastIdValues([&](size_t) { return id; });
+ } else {
+ tassert(7026707,
+ "if there are explicit id fields, internal representation of id is an array",
+ id.isArray());
+ const std::vector<Value>& idValues = id.getArray();
+ return checkForBatchEndAndUpdateLastIdValues([&](size_t i) { return idValues[i]; });
+ }
+}
+
+template <typename IdValueGetter>
+bool DocumentSourceStreamingGroup::checkForBatchEndAndUpdateLastIdValues(
+ const IdValueGetter& idValueGetter) {
+ auto assertStreamable = [&](Value value) {
+ // Nullish and array values will mess us up because they sort differently than they group.
+ // A null and a missing value will compare equal in sorting, but could result in different
+ // groups, e.g. {_id: {x: null, y: null}} vs {_id: {}}. An array value will sort by the min
+ // or max element, with no tie breaking, but group by the whole array. This means that two
+ // of the exact same array could appear in the input sequence, but with a different array in
+ // the middle of them, and that would still be considered sorted. That would break our
+ // batching group logic.
+ tassert(7026708,
+ "Monotonic value should not be missing, null or an array",
+ !value.nullish() && !value.isArray());
+ return value;
+ };
+
+ // If _lastMonotonicIdFieldValues is empty, it is the first document, so the only thing we need
+ // to do is initialize it.
+ if (_lastMonotonicIdFieldValues.empty()) {
+ for (size_t i : _monotonicExpressionIndexes) {
+ _lastMonotonicIdFieldValues.push_back(assertStreamable(idValueGetter(i)));
+ }
+ return false;
+ } else {
+ bool batchFinished = false;
+ for (size_t index = 0; index < _monotonicExpressionIndexes.size(); ++index) {
+ Value& oldId = _lastMonotonicIdFieldValues[index];
+ const Value& id = assertStreamable(idValueGetter(_monotonicExpressionIndexes[index]));
+ if (pExpCtx->getValueComparator().compare(oldId, id) != 0) {
+ oldId = id;
+ batchFinished = true;
+ }
+ }
+ return batchFinished;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_streaming_group.h b/src/mongo/db/pipeline/document_source_streaming_group.h
new file mode 100644
index 00000000000..c939a6728f2
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_streaming_group.h
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "mongo/db/pipeline/document_source_group_base.h"
+
+namespace mongo {
+
+/**
+ * This class represents streaming group implementation that can only be used when at least one of
+ * _id fields is monotonic. It stores and output groups in batches. All groups in the batch has
+ * the same value of monotonic id fields.
+ *
+ * For example, if the inputs are sorted by "x", we could use a batched streaming algorithm to
+ * perform the grouping for {$group: {_id: {x: "$x", y: "$y"}}}.
+ *
+ * Groups are processes in batches. One batch corresponds to a set of groups when each monotonic
+ * id field have the same value. Non-monotonic fields can have different values, so we still may
+ * have multiple groups and even spill to disk, but we still consume significanty less memory
+ * than general hash based group.
+ * When a document with a different value in at least one group id field is encountered, it is
+ * cached in '_firstDocumentOfNextBatch', current groups are finalized and returned in
+ * subsequent getNext() called and when the current batch is depleted, memory is freeed and the
+ * process starts again.
+ *
+ * TODO SERVER-71437 Implement an optimization for a special case where all group fields are
+ * monotonic
+ * - we don't need any hashing in this case.
+ */
+class DocumentSourceStreamingGroup final : public DocumentSourceGroupBase {
+public:
+ static constexpr StringData kStageName = "$_internalStreamingGroup"_sd;
+
+ const char* getSourceName() const final;
+
+ /**
+ * Parses 'elem' into a $_internalStreamingGroup stage, or throws a AssertionException if 'elem'
+ * was an invalid specification.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static boost::intrusive_ptr<DocumentSource> createFromBsonWithMaxMemoryUsage(
+ BSONElement elem,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes);
+
+protected:
+ GetNextResult doGetNext() final;
+
+ bool isSpecFieldReserved(StringData fieldName) final;
+ void serializeAdditionalFields(MutableDocument& out,
+ boost::optional<ExplainOptions::Verbosity> explain) const final;
+
+private:
+ static constexpr StringData kMonotonicIdFieldsSpecField = "$monotonicIdFields"_sd;
+
+ explicit DocumentSourceStreamingGroup(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes = boost::none);
+
+
+ GetNextResult getNextDocument();
+
+ GetNextResult readyNextBatch();
+ /**
+ * Readies next batch after all children are initialized. See readyNextBatch() for
+ * more details.
+ */
+ GetNextResult readyNextBatchInner(GetNextResult input);
+
+ bool isBatchFinished(const Value& id);
+
+ template <typename IdValueGetter>
+ bool checkForBatchEndAndUpdateLastIdValues(const IdValueGetter& idValueGetter);
+
+ std::vector<size_t> _monotonicExpressionIndexes;
+ std::vector<Value> _lastMonotonicIdFieldValues;
+
+ boost::optional<Document> _firstDocumentOfNextBatch;
+
+ bool _sourceDepleted;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/group_from_first_document_transformation.cpp b/src/mongo/db/pipeline/group_from_first_document_transformation.cpp
new file mode 100644
index 00000000000..411754eb744
--- /dev/null
+++ b/src/mongo/db/pipeline/group_from_first_document_transformation.cpp
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/group_from_first_document_transformation.h"
+
+namespace mongo {
+Document GroupFromFirstDocumentTransformation::applyTransformation(const Document& input) {
+ MutableDocument output(_accumulatorExprs.size());
+
+ for (auto&& expr : _accumulatorExprs) {
+ auto value = expr.second->evaluate(input, &expr.second->getExpressionContext()->variables);
+ output.addField(expr.first, value.missing() ? Value(BSONNULL) : std::move(value));
+ }
+
+ return output.freeze();
+}
+
+void GroupFromFirstDocumentTransformation::optimize() {
+ for (auto&& expr : _accumulatorExprs) {
+ expr.second = expr.second->optimize();
+ }
+}
+
+Document GroupFromFirstDocumentTransformation::serializeTransformation(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+
+ MutableDocument newRoot(_accumulatorExprs.size());
+ for (auto&& expr : _accumulatorExprs) {
+ newRoot.addField(expr.first, expr.second->serialize(static_cast<bool>(explain)));
+ }
+
+ return {{"newRoot", newRoot.freezeToValue()}};
+}
+
+DepsTracker::State GroupFromFirstDocumentTransformation::addDependencies(DepsTracker* deps) const {
+ for (auto&& expr : _accumulatorExprs) {
+ expression::addDependencies(expr.second.get(), deps);
+ }
+
+ // This stage will replace the entire document with a new document, so any existing fields
+ // will be replaced and cannot be required as dependencies. We use EXHAUSTIVE_ALL here
+ // instead of EXHAUSTIVE_FIELDS, as in ReplaceRootTransformation, because the stages that
+ // follow a $group stage should not depend on document metadata.
+ return DepsTracker::State::EXHAUSTIVE_ALL;
+}
+
+void GroupFromFirstDocumentTransformation::addVariableRefs(std::set<Variables::Id>* refs) const {
+ for (auto&& expr : _accumulatorExprs) {
+ expression::addVariableRefs(expr.second.get(), refs);
+ }
+}
+
+DocumentSource::GetModPathsReturn GroupFromFirstDocumentTransformation::getModifiedPaths() const {
+ // Replaces the entire root, so all paths are modified.
+ return {DocumentSource::GetModPathsReturn::Type::kAllPaths, OrderedPathSet{}, {}};
+}
+
+std::unique_ptr<GroupFromFirstDocumentTransformation> GroupFromFirstDocumentTransformation::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const std::string& groupId,
+ StringData originalStageName,
+ std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs) {
+ return std::make_unique<GroupFromFirstDocumentTransformation>(
+ groupId, originalStageName, std::move(accumulatorExprs));
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/group_from_first_document_transformation.h b/src/mongo/db/pipeline/group_from_first_document_transformation.h
new file mode 100644
index 00000000000..a7beafc4989
--- /dev/null
+++ b/src/mongo/db/pipeline/group_from_first_document_transformation.h
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/transformer_interface.h"
+
+namespace mongo {
+
+/**
+ * GroupFromFirstTransformation consists of a list of (field name, expression pairs). It returns a
+ * document synthesized by assigning each field name in the output document to the result of
+ * evaluating the corresponding expression. If the expression evaluates to missing, we assign a
+ * value of BSONNULL. This is necessary to match the semantics of $first for missing fields.
+ */
+class GroupFromFirstDocumentTransformation final : public TransformerInterface {
+public:
+ GroupFromFirstDocumentTransformation(
+ const std::string& groupId,
+ StringData originalStageName,
+ std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs)
+ : _accumulatorExprs(std::move(accumulatorExprs)),
+ _groupId(groupId),
+ _originalStageName(originalStageName) {}
+
+ TransformerType getType() const final {
+ return TransformerType::kGroupFromFirstDocument;
+ }
+
+ /**
+ * The path of the field that we are grouping on: i.e., the field in the input document that we
+ * will use to create the _id field of the ouptut document.
+ */
+ const std::string& groupId() const {
+ return _groupId;
+ }
+
+ StringData originalStageName() const {
+ return _originalStageName;
+ }
+
+ Document applyTransformation(const Document& input) final;
+
+ void optimize() final;
+
+ Document serializeTransformation(
+ boost::optional<ExplainOptions::Verbosity> explain) const final;
+
+ DepsTracker::State addDependencies(DepsTracker* deps) const final;
+
+ void addVariableRefs(std::set<Variables::Id>* refs) const final;
+
+ DocumentSource::GetModPathsReturn getModifiedPaths() const final;
+
+ static std::unique_ptr<GroupFromFirstDocumentTransformation> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const std::string& groupId,
+ StringData originalStageName,
+ std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs);
+
+private:
+ std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> _accumulatorExprs;
+ std::string _groupId;
+ StringData _originalStageName;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ec84917b5c5..0998f2a6f1d 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -826,10 +826,10 @@ namespace {
* the case of a $sort with a non-null value for getLimitSrc(), indicating that there was previously
* a $limit stage that was optimized away.
*/
-std::pair<boost::intrusive_ptr<DocumentSourceSort>, boost::intrusive_ptr<DocumentSourceGroup>>
+std::pair<boost::intrusive_ptr<DocumentSourceSort>, boost::intrusive_ptr<DocumentSourceGroupBase>>
getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) {
boost::intrusive_ptr<DocumentSourceSort> sortStage = nullptr;
- boost::intrusive_ptr<DocumentSourceGroup> groupStage = nullptr;
+ boost::intrusive_ptr<DocumentSourceGroupBase> groupStage = nullptr;
auto sourcesIt = sources.begin();
if (sourcesIt != sources.end()) {
@@ -845,7 +845,7 @@ getSortAndGroupStagesFromPipeline(const Pipeline::SourceContainer& sources) {
}
if (sourcesIt != sources.end()) {
- groupStage = dynamic_cast<DocumentSourceGroup*>(sourcesIt->get());
+ groupStage = dynamic_cast<DocumentSourceGroupBase*>(sourcesIt->get());
}
return std::make_pair(sortStage, groupStage);
@@ -1688,7 +1688,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// will handle the sort, and the groupTransform (added below) will handle the $group
// stage.
pipeline->popFrontWithName(DocumentSourceSort::kStageName);
- pipeline->popFrontWithName(DocumentSourceGroup::kStageName);
+ pipeline->popFrontWithName(rewrittenGroupStage->originalStageName());
boost::intrusive_ptr<DocumentSource> groupTransform(
new DocumentSourceSingleDocumentTransformation(