diff options
author | David Storch <david.storch@10gen.com> | 2019-02-26 17:51:45 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2019-03-11 10:14:57 -0400 |
commit | c0492bf5b77542cbacd73bcfe1e0c999b8078c07 (patch) | |
tree | 47735a9c7aaa58b9af8d646be99700be1426ada9 /src/mongo/db/pipeline/document_source_group.cpp | |
parent | 9a7cfb73da3a86d1c20f674140f1f908e2bae0c8 (diff) | |
download | mongo-c0492bf5b77542cbacd73bcfe1e0c999b8078c07.tar.gz |
SERVER-40056 Remove partial implementation of streaming $group.
The streaming $group optimization was never fully implemented,
so the code was disabled. This patch removes the dead code, including
DocumentSource::getOutputSorts().
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 283 |
1 files changed, 0 insertions, 283 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 839945ff86b..fa006b78b0c 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -149,8 +149,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNext() { if (_spilled) { return getNextSpilled(); - } else if (_streaming) { - return getNextStreaming(); } else { return getNextStandard(); } @@ -203,44 +201,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextStandard() { return std::move(out); } -DocumentSource::GetNextResult DocumentSourceGroup::getNextStreaming() { - // Streaming optimization is active. - if (!_firstDocOfNextGroup) { - auto nextInput = pSource->getNext(); - if (!nextInput.isAdvanced()) { - return nextInput; - } - _firstDocOfNextGroup = nextInput.releaseDocument(); - } - - Value id; - do { - // Add to the current accumulator(s). - for (size_t i = 0; i < _currentAccumulators.size(); i++) { - _currentAccumulators[i]->process( - _accumulatedFields[i].expression->evaluate(*_firstDocOfNextGroup), _doingMerge); - } - - // Retrieve the next document. - auto nextInput = pSource->getNext(); - if (!nextInput.isAdvanced()) { - return nextInput; - } - - _firstDocOfNextGroup = nextInput.releaseDocument(); - - - // Compute the id. If it does not match _currentId, we will exit the loop, leaving - // _firstDocOfNextGroup set for the next time getNext() is called. - id = computeId(*_firstDocOfNextGroup); - } while (pExpCtx->getValueComparator().evaluate(_currentId == id)); - - Document out = makeDocument(_currentId, _currentAccumulators, pExpCtx->needsMerge); - _currentId = std::move(id); - - return std::move(out); -} - void DocumentSourceGroup::doDispose() { // Free our resources. _groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>(); @@ -248,8 +208,6 @@ void DocumentSourceGroup::doDispose() { // Make us look done. groupsIterator = _groups->end(); - - _firstDocOfNextGroup = boost::none; } intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() { @@ -298,9 +256,6 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> insides["$doingMerge"] = Value(true); } - if (explain && findRelevantInputSort()) { - return Value(DOC("$streamingGroup" << insides.freeze())); - } return Value(DOC(getSourceName() << insides.freeze())); } @@ -360,8 +315,6 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _doingMerge(false), _maxMemoryUsageBytes(maxMemoryUsageBytes ? *maxMemoryUsageBytes : internalDocumentSourceGroupMaxMemoryBytes.load()), - _inputSort(BSONObj()), - _streaming(false), _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), _spilled(false), @@ -494,100 +447,11 @@ public: private: ValueComparator _valueComparator; }; - -bool containsOnlyFieldPathsAndConstants(ExpressionObject* expressionObj) { - for (auto&& it : expressionObj->getChildExpressions()) { - const intrusive_ptr<Expression>& childExp = it.second; - if (dynamic_cast<ExpressionFieldPath*>(childExp.get())) { - continue; - } else if (dynamic_cast<ExpressionConstant*>(childExp.get())) { - continue; - } else if (auto expObj = dynamic_cast<ExpressionObject*>(childExp.get())) { - if (!containsOnlyFieldPathsAndConstants(expObj)) { - // A nested expression was not a FieldPath or a constant. - return false; - } - } else { - // expressionObj was something other than a FieldPath, a constant, or a nested object. - return false; - } - } - return true; -} - -void getFieldPathMap(ExpressionObject* expressionObj, - std::string prefix, - StringMap<std::string>* fields) { - // Given an expression with only constant and FieldPath leaf nodes, such as {x: {y: "$a.b"}}, - // attempt to compute a map from each FieldPath leaf to the path of that leaf. In the example, - // this method would return: {"a.b" : "x.y"}. - - for (auto&& it : expressionObj->getChildExpressions()) { - intrusive_ptr<Expression> childExp = it.second; - ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get()); - ExpressionFieldPath* expPath = dynamic_cast<ExpressionFieldPath*>(childExp.get()); - - std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first; - - if (expObj) { - getFieldPathMap(expObj, newPrefix, fields); - } else if (expPath) { - (*fields)[expPath->getFieldPath().tail().fullPath()] = newPrefix; - } - } -} - -void getFieldPathListForSpilled(ExpressionObject* expressionObj, - std::string prefix, - std::vector<std::string>* fields) { - // Given an expression, attempt to compute a vector of strings, each representing the path - // through the object to a leaf. For example, for the expression represented by - // {x: 2, y: {z: "$a.b"}}, the output would be ["x", "y.z"]. - for (auto&& it : expressionObj->getChildExpressions()) { - intrusive_ptr<Expression> childExp = it.second; - ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get()); - - std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first; - - if (expObj) { - getFieldPathListForSpilled(expObj, newPrefix, fields); - } else { - fields->push_back(newPrefix); - } - } -} } // namespace DocumentSource::GetNextResult DocumentSourceGroup::initialize() { const size_t numAccumulators = _accumulatedFields.size(); - boost::optional<BSONObj> inputSort = findRelevantInputSort(); - if (inputSort) { - // We can convert to streaming. - _streaming = true; - _inputSort = *inputSort; - - // Set up accumulators. - _currentAccumulators.reserve(numAccumulators); - for (auto&& accumulatedField : _accumulatedFields) { - _currentAccumulators.push_back(accumulatedField.makeAccumulator(pExpCtx)); - } - - // We only need to load the first document. - auto firstInput = pSource->getNext(); - if (!firstInput.isAdvanced()) { - // Leave '_firstDocOfNextGroup' uninitialized and return. - return firstInput; - } - _firstDocOfNextGroup = firstInput.releaseDocument(); - - // Compute the _id value. - _currentId = computeId(*_firstDocOfNextGroup); - _initialized = true; - return DocumentSource::GetNextResult::makeEOF(); - } - - // Barring any pausing, this loop exhausts 'pSource' and populates '_groups'. GetNextResult input = pSource->getNext(); for (; input.isAdvanced(); input = pSource->getNext()) { @@ -744,153 +608,6 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() { return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr); } -boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const { - if (true) { - // Until streaming $group correctly handles nullish values, the streaming behavior is - // disabled. See SERVER-23318. - return boost::none; - } - - if (!pSource) { - // Sometimes when performing an explain, or using $group as the merge point, 'pSource' will - // not be set. - return boost::none; - } - - BSONObjSet sorts = pSource->getOutputSorts(); - - // 'sorts' is a BSONObjSet. We need to check if our group pattern is compatible with one of the - // input sort patterns. - - // We will only attempt to take advantage of a sorted input stream if the _id given to the - // $group contained only FieldPaths or constants. Determine if this is the case, and extract - // those FieldPaths if it is. - DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); // We don't support streaming - // based off a text score. - for (auto&& exp : _idExpressions) { - if (dynamic_cast<ExpressionConstant*>(exp.get())) { - continue; - } - ExpressionObject* obj; - if ((obj = dynamic_cast<ExpressionObject*>(exp.get()))) { - // We can only perform an optimization if there are no operators in the _id expression. - if (!containsOnlyFieldPathsAndConstants(obj)) { - return boost::none; - } - } else if (!dynamic_cast<ExpressionFieldPath*>(exp.get())) { - return boost::none; - } - exp->addDependencies(&deps); - } - - if (deps.needWholeDocument) { - // We don't swap to streaming if we need the entire document, which is likely because of - // $$ROOT. - return boost::none; - } - - if (deps.fields.empty()) { - // Our _id field is constant, so we should stream, but the input sort we choose is - // irrelevant since we will output only one document. - return BSONObj(); - } - - for (auto&& obj : sorts) { - // Note that a sort order of, e.g., {a: 1, b: 1, c: 1} allows us to do a non-blocking group - // for every permutation of group by (a, b, c), since we are guaranteed that documents with - // the same value of (a, b, c) will be consecutive in the input stream, no matter what our - // _id is. - auto fieldNames = obj.getFieldNames<std::set<std::string>>(); - if (fieldNames == deps.fields) { - return obj; - } - } - - return boost::none; -} - -BSONObjSet DocumentSourceGroup::getOutputSorts() { - if (!_initialized) { - initialize(); // Note this might not finish initializing, but that's OK. We just want to - // do some initialization to try to determine if we are streaming or spilled. - // False negatives are OK. - } - - if (!(_streaming || _spilled)) { - return SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - } - - BSONObjBuilder sortOrder; - - if (_idFieldNames.empty()) { - if (_spilled) { - sortOrder.append("_id", 1); - } else { - // We have an expression like {_id: "$a"}. Check if this is a FieldPath, and if it is, - // get the sort order out of it. - if (auto obj = dynamic_cast<ExpressionFieldPath*>(_idExpressions[0].get())) { - FieldPath _idSort = obj->getFieldPath(); - - sortOrder.append( - "_id", - _inputSort.getIntField(_idSort.getFieldName(_idSort.getPathLength() - 1))); - } - } - } else if (_streaming) { - // At this point, we know that _streaming is true, so _id must have only contained - // ExpressionObjects, ExpressionConstants or ExpressionFieldPaths. We now process each - // '_idExpression'. - - // We populate 'fieldMap' such that each key is a field the input is sorted by, and the - // value is where that input field is located within the _id document. For example, if our - // _id object is {_id: {x: {y: "$a.b"}}}, 'fieldMap' would be: {'a.b': '_id.x.y'}. - StringMap<std::string> fieldMap; - for (size_t i = 0; i < _idFieldNames.size(); i++) { - intrusive_ptr<Expression> exp = _idExpressions[i]; - if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) { - // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}. - getFieldPathMap(obj, "_id." + _idFieldNames[i], &fieldMap); - } else if (auto fieldPath = dynamic_cast<ExpressionFieldPath*>(exp.get())) { - FieldPath _idSort = fieldPath->getFieldPath(); - fieldMap[_idSort.getFieldName(_idSort.getPathLength() - 1)] = - "_id." + _idFieldNames[i]; - } - } - - // Because the order of '_inputSort' is important, we go through each field we are sorted on - // and append it to the BSONObjBuilder in order. - for (BSONElement sortField : _inputSort) { - std::string sortString = sortField.fieldNameStringData().toString(); - - auto itr = fieldMap.find(sortString); - - // If our sort order is (a, b, c), we could not have converted to a streaming $group if - // our _id was predicated on (a, c) but not 'b'. Verify that this is true. - invariant(itr != fieldMap.end()); - - sortOrder.append(itr->second, _inputSort.getIntField(sortString)); - } - } else { - // We are blocking and have spilled to disk. - std::vector<std::string> outputSort; - for (size_t i = 0; i < _idFieldNames.size(); i++) { - intrusive_ptr<Expression> exp = _idExpressions[i]; - if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) { - // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}. - getFieldPathListForSpilled(obj, "_id." + _idFieldNames[i], &outputSort); - } else { - outputSort.push_back("_id." + _idFieldNames[i]); - } - } - for (auto&& field : outputSort) { - sortOrder.append(field, 1); - } - } - - return allPrefixes(sortOrder.obj()); -} - - Value DocumentSourceGroup::computeId(const Document& root) { // If only one expression, return result directly if (_idExpressions.size() == 1) { |