diff options
author | Benjamin Murphy <benjamin_murphy@me.com> | 2016-02-10 17:24:21 -0500 |
---|---|---|
committer | Benjamin Murphy <benjamin_murphy@me.com> | 2016-03-24 11:05:32 -0400 |
commit | f4bbde02bab191cdba4195ec9ad73c60d4aece41 (patch) | |
tree | ce812891454e6d5ed946813452685f7590b8efbf /src/mongo/db/pipeline/document_source_group.cpp | |
parent | f40294818ce8690f1a485ca32ea52e33e137b7ea (diff) | |
download | mongo-f4bbde02bab191cdba4195ec9ad73c60d4aece41.tar.gz |
SERVER-4507 Group stages now take advantage of sorted input sequences.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 435 |
1 files changed, 357 insertions, 78 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 5a98ccbabec..1d33a337024 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -28,7 +28,6 @@ #include "mongo/platform/basic.h" - #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" @@ -53,82 +52,125 @@ const char* DocumentSourceGroup::getSourceName() const { boost::optional<Document> DocumentSourceGroup::getNext() { pExpCtx->checkForInterrupt(); - if (!populated) - populate(); + if (!_initialized) + initialize(); - if (_spilled) { - if (!_sorterIterator) - return boost::none; + for (auto&& accum : _currentAccumulators) { + accum->reset(); // Prep accumulators for a new group. + } - const size_t numAccumulators = vpAccumulatorFactory.size(); - for (size_t i = 0; i < numAccumulators; i++) { - _currentAccumulators[i]->reset(); // prep accumulators for a new group - } + if (_spilled) { + return getNextSpilled(); + } else if (_streaming) { + return getNextStreaming(); + } else { + return getNextStandard(); + } +} - _currentId = _firstPartOfNextGroup.first; - while (_currentId == _firstPartOfNextGroup.first) { - // Inside of this loop, _firstPartOfNextGroup is the current data being processed. - // At loop exit, it is the first value to be processed in the next group. - - switch (numAccumulators) { // mirrors switch in spill() - case 0: // no Accumulators so no Values - break; - - case 1: // single accumulators serialize as a single Value - _currentAccumulators[0]->process(_firstPartOfNextGroup.second, - /*merging=*/true); - break; - - default: { // multiple accumulators serialize as an array - const vector<Value>& accumulatorStates = - _firstPartOfNextGroup.second.getArray(); - for (size_t i = 0; i < numAccumulators; i++) { - _currentAccumulators[i]->process(accumulatorStates[i], - /*merging=*/true); - } - break; - } - } +boost::optional<Document> DocumentSourceGroup::getNextSpilled() { + // We aren't streaming, and we have spilled to disk. + if (!_sorterIterator) + return boost::none; - if (!_sorterIterator->more()) { - dispose(); + _currentId = _firstPartOfNextGroup.first; + const size_t numAccumulators = vpAccumulatorFactory.size(); + while (_currentId == _firstPartOfNextGroup.first) { + // Inside of this loop, _firstPartOfNextGroup is the current data being processed. + // At loop exit, it is the first value to be processed in the next group. + switch (numAccumulators) { // mirrors switch in spill() + case 1: // Single accumulators serialize as a single Value. + _currentAccumulators[0]->process(_firstPartOfNextGroup.second, true); + case 0: // No accumulators so no Values. break; + default: { // Multiple accumulators serialize as an array of Values. + const vector<Value>& accumulatorStates = _firstPartOfNextGroup.second.getArray(); + for (size_t i = 0; i < numAccumulators; i++) { + _currentAccumulators[i]->process(accumulatorStates[i], true); + } } + } - _firstPartOfNextGroup = _sorterIterator->next(); + if (!_sorterIterator->more()) { + dispose(); + break; } - return makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard); + _firstPartOfNextGroup = _sorterIterator->next(); + } - } else { - if (groups.empty()) - return boost::none; + return makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard); +} - Document out = - makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->inShard); +boost::optional<Document> DocumentSourceGroup::getNextStandard() { + // Not spilled, and not streaming. + if (groups.empty()) + return boost::none; - if (++groupsIterator == groups.end()) - dispose(); + Document out = makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->inShard); + + if (++groupsIterator == groups.end()) + dispose(); + + return out; +} - return out; +boost::optional<Document> DocumentSourceGroup::getNextStreaming() { + // Streaming optimization is active. + if (!_firstDocOfNextGroup) { + dispose(); + return boost::none; } + + Value id; + do { + // Add to the current accumulator(s). + for (size_t i = 0; i < _currentAccumulators.size(); i++) { + _currentAccumulators[i]->process(vpExpression[i]->evaluate(_variables.get()), + _doingMerge); + } + + // Release our references to the previous input document before asking for the next. This + // makes operations like $unwind more efficient. + _variables->clearRoot(); + _firstDocOfNextGroup = {}; + + // Retrieve the next document. + _firstDocOfNextGroup = pSource->getNext(); + if (!_firstDocOfNextGroup) { + break; + } + + _variables->setRoot(*_firstDocOfNextGroup); + + // Compute the id. If it does not match _currentId, we will exit the loop, leaving + // _firstDocOfNextGroup set for the next time getNext() is called. + id = computeId(_variables.get()); + } while (_currentId == id); + + Document out = makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard); + _currentId = std::move(id); + + return out; } void DocumentSourceGroup::dispose() { - // free our resources + // Free our resources. GroupsMap().swap(groups); _sorterIterator.reset(); - // make us look done + // Make us look done. groupsIterator = groups.end(); - // free our source's resources + _firstDocOfNextGroup = boost::none; + + // Free our source's resources. pSource->dispose(); } intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { - // TODO if all _idExpressions are ExpressionConstants after optimization, then we know there - // will only be one group. We should take advantage of that to avoid going through the hash + // TODO: If all _idExpressions are ExpressionConstants after optimization, then we know there + // will be only one group. We should take advantage of that to avoid going through the hash // table. for (size_t i = 0; i < _idExpressions.size(); i++) { _idExpressions[i] = _idExpressions[i]->optimize(); @@ -144,12 +186,12 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { Value DocumentSourceGroup::serialize(bool explain) const { MutableDocument insides; - // add the _id + // Add the _id. if (_idFieldNames.empty()) { invariant(_idExpressions.size() == 1); insides["_id"] = _idExpressions[0]->serialize(explain); } else { - // decomposed document case + // Decomposed document case. invariant(_idExpressions.size() == _idFieldNames.size()); MutableDocument md; for (size_t i = 0; i < _idExpressions.size(); i++) { @@ -158,7 +200,7 @@ Value DocumentSourceGroup::serialize(bool explain) const { insides["_id"] = md.freezeToValue(); } - // add the remaining fields + // Add the remaining fields. const size_t n = vFieldName.size(); for (size_t i = 0; i < n; ++i) { intrusive_ptr<Accumulator> accum = vpAccumulatorFactory[i](); @@ -169,10 +211,12 @@ Value DocumentSourceGroup::serialize(bool explain) const { if (_doingMerge) { // This makes the output unparsable (with error) on pre 2.6 shards, but it will never // be sent to old shards when this flag is true since they can't do a merge anyway. - insides["$doingMerge"] = Value(true); } + if (explain && findRelevantInputSort()) { + return Value(DOC("$streamingGroup" << insides.freeze())); + } return Value(DOC(getSourceName() << insides.freeze())); } @@ -193,17 +237,18 @@ DocumentSource::GetDepsReturn DocumentSourceGroup::getDependencies(DepsTracker* intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create( const intrusive_ptr<ExpressionContext>& pExpCtx) { - intrusive_ptr<DocumentSourceGroup> pSource(new DocumentSourceGroup(pExpCtx)); - return pSource; + return new DocumentSourceGroup(pExpCtx); } DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& pExpCtx) : DocumentSource(pExpCtx), - populated(false), _doingMerge(false), + _maxMemoryUsageBytes(100 * 1024 * 1024), + _inputSort(BSONObj()), + _streaming(false), + _initialized(false), _spilled(false), - _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter), - _maxMemoryUsageBytes(100 * 1024 * 1024) {} + _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {} void DocumentSourceGroup::addAccumulator(const std::string& fieldName, Accumulator::Factory accumulatorFactory, @@ -296,6 +341,9 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson( } namespace { + +using GroupsMap = DocumentSourceGroup::GroupsMap; + class SorterComparator { public: typedef pair<Value, Value> Data; @@ -303,10 +351,107 @@ public: return Value::compare(lhs.first, rhs.first); } }; + +class SpillSTLComparator { +public: + bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const { + return Value::compare(lhs->first, rhs->first) < 0; + } +}; + +bool containsOnlyFieldPathsAndConstants(ExpressionObject* expressionObj) { + for (auto&& it : expressionObj->getChildExpressions()) { + const intrusive_ptr<Expression>& childExp = it.second; + if (dynamic_cast<ExpressionFieldPath*>(childExp.get())) { + continue; + } else if (dynamic_cast<ExpressionConstant*>(childExp.get())) { + continue; + } else if (auto expObj = dynamic_cast<ExpressionObject*>(childExp.get())) { + if (!containsOnlyFieldPathsAndConstants(expObj)) { + // A nested expression was not a FieldPath or a constant. + return false; + } + } else { + // expressionObj was something other than a FieldPath, a constant, or a nested object. + return false; + } + } + return true; +} + +void getFieldPathMap(ExpressionObject* expressionObj, + std::string prefix, + StringMap<std::string>* fields) { + // Given an expression with only constant and FieldPath leaf nodes, such as {x: {y: "$a.b"}}, + // attempt to compute a map from each FieldPath leaf to the path of that leaf. In the example, + // this method would return: {"a.b" : "x.y"}. + + for (auto&& it : expressionObj->getChildExpressions()) { + intrusive_ptr<Expression> childExp = it.second; + ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get()); + ExpressionFieldPath* expPath = dynamic_cast<ExpressionFieldPath*>(childExp.get()); + + std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first; + + if (expObj) { + getFieldPathMap(expObj, newPrefix, fields); + } else if (expPath) { + (*fields)[expPath->getFieldPath().tail().getPath(false)] = newPrefix; + } + } +} + +void getFieldPathListForSpilled(ExpressionObject* expressionObj, + std::string prefix, + std::vector<std::string>* fields) { + // Given an expression, attempt to compute a vector of strings, each representing the path + // through the object to a leaf. For example, for the expression represented by + // {x: 2, y: {z: "$a.b"}}, the output would be ["x", "y.z"]. + for (auto&& it : expressionObj->getChildExpressions()) { + intrusive_ptr<Expression> childExp = it.second; + ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get()); + + std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first; + + if (expObj) { + getFieldPathListForSpilled(expObj, newPrefix, fields); + } else { + fields->push_back(newPrefix); + } + } } +} // namespace -void DocumentSourceGroup::populate() { +void DocumentSourceGroup::initialize() { + _initialized = true; const size_t numAccumulators = vpAccumulatorFactory.size(); + + boost::optional<BSONObj> inputSort = findRelevantInputSort(); + if (inputSort) { + // We can convert to streaming. + _streaming = true; + _inputSort = *inputSort; + + // Set up accumulators. + _currentAccumulators.reserve(numAccumulators); + for (size_t i = 0; i < numAccumulators; i++) { + _currentAccumulators.push_back(vpAccumulatorFactory[i]()); + } + + // We only need to load the first document. + _firstDocOfNextGroup = pSource->getNext(); + + if (!_firstDocOfNextGroup) { + return; + } + + _variables->setRoot(*_firstDocOfNextGroup); + + // Compute the _id value. + _currentId = computeId(_variables.get()); + return; + } + dassert(numAccumulators == vpExpression.size()); // pushed to on spill() @@ -329,10 +474,6 @@ void DocumentSourceGroup::populate() { /* get the _id value */ Value id = computeId(_variables.get()); - /* treat missing values the same as NULL SERVER-4674 */ - if (id.missing()) - id = Value(BSONNULL); - /* Look for the _id value in the map; if it's not there, add a new entry with a blank accumulator. @@ -406,17 +547,8 @@ void DocumentSourceGroup::populate() { // start the group iterator groupsIterator = groups.begin(); } - - populated = true; } -class DocumentSourceGroup::SpillSTLComparator { -public: - bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const { - return Value::compare(lhs->first, rhs->first) < 0; - } -}; - shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting ptrs.reserve(groups.size()); @@ -457,6 +589,151 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { return shared_ptr<Sorter<Value, Value>::Iterator>(writer.done()); } +boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const { + if (true) { + // Until streaming $group correctly handles nullish values, the streaming behavior is + // disabled. See SERVER-23318. + return boost::none; + } + + if (!pSource) { + // Sometimes when performing an explain, or using $group as the merge point, 'pSource' will + // not be set. + return boost::none; + } + + BSONObjSet sorts = pSource->getOutputSorts(); + + // 'sorts' is a BSONObjSet. We need to check if our group pattern is compatible with one of the + // input sort patterns. + + // We will only attempt to take advantage of a sorted input stream if the _id given to the + // $group contained only FieldPaths or constants. Determine if this is the case, and extract + // those FieldPaths if it is. + DepsTracker deps; + for (auto&& exp : _idExpressions) { + if (dynamic_cast<ExpressionConstant*>(exp.get())) { + continue; + } + ExpressionObject* obj; + if ((obj = dynamic_cast<ExpressionObject*>(exp.get()))) { + // We can only perform an optimization if there are no operators in the _id expression. + if (!containsOnlyFieldPathsAndConstants(obj)) { + return boost::none; + } + } else if (!dynamic_cast<ExpressionFieldPath*>(exp.get())) { + return boost::none; + } + exp->addDependencies(&deps); + } + + if (deps.needWholeDocument) { + // We don't swap to streaming if we need the entire document, which is likely because of + // $$ROOT. + return boost::none; + } + + if (deps.fields.empty()) { + // Our _id field is constant, so we should stream, but the input sort we choose is + // irrelevant since we will output only one document. + return BSONObj(); + } + + for (auto&& obj : sorts) { + // Note that a sort order of, e.g., {a: 1, b: 1, c: 1} allows us to do a non-blocking group + // for every permutation of group by (a, b, c), since we are guaranteed that documents with + // the same value of (a, b, c) will be consecutive in the input stream, no matter what our + // _id is. + std::set<std::string> fieldNames; + obj.getFieldNames(fieldNames); + if (fieldNames == deps.fields) { + return obj; + } + } + + return boost::none; +} + +BSONObjSet DocumentSourceGroup::getOutputSorts() { + if (!_initialized) { + initialize(); + } + + if (!(_streaming || _spilled)) { + return BSONObjSet(); + } + + BSONObjBuilder sortOrder; + + if (_idFieldNames.empty()) { + if (_spilled) { + sortOrder.append("_id", 1); + } else { + // We have an expression like {_id: "$a"}. Check if this is a FieldPath, and if it is, + // get the sort order out of it. + if (auto obj = dynamic_cast<ExpressionFieldPath*>(_idExpressions[0].get())) { + FieldPath _idSort = obj->getFieldPath(); + + sortOrder.append( + "_id", + _inputSort.getIntField(_idSort.getFieldName(_idSort.getPathLength() - 1))); + } + } + } else if (_streaming) { + // At this point, we know that _streaming is true, so _id must have only contained + // ExpressionObjects, ExpressionConstants or ExpressionFieldPaths. We now process each + // '_idExpression'. + + // We populate 'fieldMap' such that each key is a field the input is sorted by, and the + // value is where that input field is located within the _id document. For example, if our + // _id object is {_id: {x: {y: "$a.b"}}}, 'fieldMap' would be: {'a.b': '_id.x.y'}. + StringMap<std::string> fieldMap; + for (size_t i = 0; i < _idFieldNames.size(); i++) { + intrusive_ptr<Expression> exp = _idExpressions[i]; + if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) { + // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}. + getFieldPathMap(obj, "_id." + _idFieldNames[i], &fieldMap); + } else if (auto fieldPath = dynamic_cast<ExpressionFieldPath*>(exp.get())) { + FieldPath _idSort = fieldPath->getFieldPath(); + fieldMap[_idSort.getFieldName(_idSort.getPathLength() - 1)] = + "_id." + _idFieldNames[i]; + } + } + + // Because the order of '_inputSort' is important, we go through each field we are sorted on + // and append it to the BSONObjBuilder in order. + for (BSONElement sortField : _inputSort) { + std::string sortString = sortField.fieldNameStringData().toString(); + + auto itr = fieldMap.find(sortString); + + // If our sort order is (a, b, c), we could not have converted to a streaming $group if + // our _id was predicated on (a, c) but not 'b'. Verify that this is true. + invariant(itr != fieldMap.end()); + + sortOrder.append(itr->second, _inputSort.getIntField(sortString)); + } + } else { + // We are blocking and have spilled to disk. + std::vector<std::string> outputSort; + for (size_t i = 0; i < _idFieldNames.size(); i++) { + intrusive_ptr<Expression> exp = _idExpressions[i]; + if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) { + // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}. + getFieldPathListForSpilled(obj, "_id." + _idFieldNames[i], &outputSort); + } else { + outputSort.push_back("_id." + _idFieldNames[i]); + } + } + for (auto&& field : outputSort) { + sortOrder.append(field, 1); + } + } + + return allPrefixes(sortOrder.obj()); +} + + void DocumentSourceGroup::parseIdExpression(BSONElement groupField, const VariablesParseState& vps) { if (groupField.type() == Object && !groupField.Obj().isEmpty()) { @@ -469,7 +746,7 @@ void DocumentSourceGroup::parseIdExpression(BSONElement groupField, _idExpressions.push_back(Expression::parseObject(idKeyObj, &oCtx, vps)); } else { // grouping on an "artificial" object. Rather than create the object for each input - // in populate(), instead group on the output of the raw expressions. The artificial + // in initialize(), instead group on the output of the raw expressions. The artificial // object will be created at the end in makeDocument() while outputting results. BSONForEach(field, idKeyObj) { uassert(17390, @@ -490,9 +767,11 @@ void DocumentSourceGroup::parseIdExpression(BSONElement groupField, } Value DocumentSourceGroup::computeId(Variables* vars) { - // If only one expression return result directly - if (_idExpressions.size() == 1) - return _idExpressions[0]->evaluate(vars); + // If only one expression, return result directly + if (_idExpressions.size() == 1) { + Value retValue = _idExpressions[0]->evaluate(vars); + return retValue.missing() ? Value(BSONNULL) : std::move(retValue); + } // Multiple expressions get results wrapped in a vector vector<Value> vals; |