summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_group.cpp
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2019-02-26 17:51:45 -0500
committerDavid Storch <david.storch@10gen.com>2019-03-11 10:14:57 -0400
commitc0492bf5b77542cbacd73bcfe1e0c999b8078c07 (patch)
tree47735a9c7aaa58b9af8d646be99700be1426ada9 /src/mongo/db/pipeline/document_source_group.cpp
parent9a7cfb73da3a86d1c20f674140f1f908e2bae0c8 (diff)
downloadmongo-c0492bf5b77542cbacd73bcfe1e0c999b8078c07.tar.gz
SERVER-40056 Remove partial implementation of streaming $group.
The streaming $group optimization was never fully implemented, so the code was disabled. This patch removes the dead code, including DocumentSource::getOutputSorts().
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp283
1 files changed, 0 insertions, 283 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 839945ff86b..fa006b78b0c 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -149,8 +149,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNext() {
if (_spilled) {
return getNextSpilled();
- } else if (_streaming) {
- return getNextStreaming();
} else {
return getNextStandard();
}
@@ -203,44 +201,6 @@ DocumentSource::GetNextResult DocumentSourceGroup::getNextStandard() {
return std::move(out);
}
-DocumentSource::GetNextResult DocumentSourceGroup::getNextStreaming() {
- // Streaming optimization is active.
- if (!_firstDocOfNextGroup) {
- auto nextInput = pSource->getNext();
- if (!nextInput.isAdvanced()) {
- return nextInput;
- }
- _firstDocOfNextGroup = nextInput.releaseDocument();
- }
-
- Value id;
- do {
- // Add to the current accumulator(s).
- for (size_t i = 0; i < _currentAccumulators.size(); i++) {
- _currentAccumulators[i]->process(
- _accumulatedFields[i].expression->evaluate(*_firstDocOfNextGroup), _doingMerge);
- }
-
- // Retrieve the next document.
- auto nextInput = pSource->getNext();
- if (!nextInput.isAdvanced()) {
- return nextInput;
- }
-
- _firstDocOfNextGroup = nextInput.releaseDocument();
-
-
- // Compute the id. If it does not match _currentId, we will exit the loop, leaving
- // _firstDocOfNextGroup set for the next time getNext() is called.
- id = computeId(*_firstDocOfNextGroup);
- } while (pExpCtx->getValueComparator().evaluate(_currentId == id));
-
- Document out = makeDocument(_currentId, _currentAccumulators, pExpCtx->needsMerge);
- _currentId = std::move(id);
-
- return std::move(out);
-}
-
void DocumentSourceGroup::doDispose() {
// Free our resources.
_groups = pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>();
@@ -248,8 +208,6 @@ void DocumentSourceGroup::doDispose() {
// Make us look done.
groupsIterator = _groups->end();
-
- _firstDocOfNextGroup = boost::none;
}
intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() {
@@ -298,9 +256,6 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity>
insides["$doingMerge"] = Value(true);
}
- if (explain && findRelevantInputSort()) {
- return Value(DOC("$streamingGroup" << insides.freeze()));
- }
return Value(DOC(getSourceName() << insides.freeze()));
}
@@ -360,8 +315,6 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_doingMerge(false),
_maxMemoryUsageBytes(maxMemoryUsageBytes ? *maxMemoryUsageBytes
: internalDocumentSourceGroupMaxMemoryBytes.load()),
- _inputSort(BSONObj()),
- _streaming(false),
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
@@ -494,100 +447,11 @@ public:
private:
ValueComparator _valueComparator;
};
-
-bool containsOnlyFieldPathsAndConstants(ExpressionObject* expressionObj) {
- for (auto&& it : expressionObj->getChildExpressions()) {
- const intrusive_ptr<Expression>& childExp = it.second;
- if (dynamic_cast<ExpressionFieldPath*>(childExp.get())) {
- continue;
- } else if (dynamic_cast<ExpressionConstant*>(childExp.get())) {
- continue;
- } else if (auto expObj = dynamic_cast<ExpressionObject*>(childExp.get())) {
- if (!containsOnlyFieldPathsAndConstants(expObj)) {
- // A nested expression was not a FieldPath or a constant.
- return false;
- }
- } else {
- // expressionObj was something other than a FieldPath, a constant, or a nested object.
- return false;
- }
- }
- return true;
-}
-
-void getFieldPathMap(ExpressionObject* expressionObj,
- std::string prefix,
- StringMap<std::string>* fields) {
- // Given an expression with only constant and FieldPath leaf nodes, such as {x: {y: "$a.b"}},
- // attempt to compute a map from each FieldPath leaf to the path of that leaf. In the example,
- // this method would return: {"a.b" : "x.y"}.
-
- for (auto&& it : expressionObj->getChildExpressions()) {
- intrusive_ptr<Expression> childExp = it.second;
- ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get());
- ExpressionFieldPath* expPath = dynamic_cast<ExpressionFieldPath*>(childExp.get());
-
- std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first;
-
- if (expObj) {
- getFieldPathMap(expObj, newPrefix, fields);
- } else if (expPath) {
- (*fields)[expPath->getFieldPath().tail().fullPath()] = newPrefix;
- }
- }
-}
-
-void getFieldPathListForSpilled(ExpressionObject* expressionObj,
- std::string prefix,
- std::vector<std::string>* fields) {
- // Given an expression, attempt to compute a vector of strings, each representing the path
- // through the object to a leaf. For example, for the expression represented by
- // {x: 2, y: {z: "$a.b"}}, the output would be ["x", "y.z"].
- for (auto&& it : expressionObj->getChildExpressions()) {
- intrusive_ptr<Expression> childExp = it.second;
- ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get());
-
- std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first;
-
- if (expObj) {
- getFieldPathListForSpilled(expObj, newPrefix, fields);
- } else {
- fields->push_back(newPrefix);
- }
- }
-}
} // namespace
DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
const size_t numAccumulators = _accumulatedFields.size();
- boost::optional<BSONObj> inputSort = findRelevantInputSort();
- if (inputSort) {
- // We can convert to streaming.
- _streaming = true;
- _inputSort = *inputSort;
-
- // Set up accumulators.
- _currentAccumulators.reserve(numAccumulators);
- for (auto&& accumulatedField : _accumulatedFields) {
- _currentAccumulators.push_back(accumulatedField.makeAccumulator(pExpCtx));
- }
-
- // We only need to load the first document.
- auto firstInput = pSource->getNext();
- if (!firstInput.isAdvanced()) {
- // Leave '_firstDocOfNextGroup' uninitialized and return.
- return firstInput;
- }
- _firstDocOfNextGroup = firstInput.releaseDocument();
-
- // Compute the _id value.
- _currentId = computeId(*_firstDocOfNextGroup);
- _initialized = true;
- return DocumentSource::GetNextResult::makeEOF();
- }
-
-
// Barring any pausing, this loop exhausts 'pSource' and populates '_groups'.
GetNextResult input = pSource->getNext();
for (; input.isAdvanced(); input = pSource->getNext()) {
@@ -744,153 +608,6 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
return shared_ptr<Sorter<Value, Value>::Iterator>(iteratorPtr);
}
-boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const {
- if (true) {
- // Until streaming $group correctly handles nullish values, the streaming behavior is
- // disabled. See SERVER-23318.
- return boost::none;
- }
-
- if (!pSource) {
- // Sometimes when performing an explain, or using $group as the merge point, 'pSource' will
- // not be set.
- return boost::none;
- }
-
- BSONObjSet sorts = pSource->getOutputSorts();
-
- // 'sorts' is a BSONObjSet. We need to check if our group pattern is compatible with one of the
- // input sort patterns.
-
- // We will only attempt to take advantage of a sorted input stream if the _id given to the
- // $group contained only FieldPaths or constants. Determine if this is the case, and extract
- // those FieldPaths if it is.
- DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); // We don't support streaming
- // based off a text score.
- for (auto&& exp : _idExpressions) {
- if (dynamic_cast<ExpressionConstant*>(exp.get())) {
- continue;
- }
- ExpressionObject* obj;
- if ((obj = dynamic_cast<ExpressionObject*>(exp.get()))) {
- // We can only perform an optimization if there are no operators in the _id expression.
- if (!containsOnlyFieldPathsAndConstants(obj)) {
- return boost::none;
- }
- } else if (!dynamic_cast<ExpressionFieldPath*>(exp.get())) {
- return boost::none;
- }
- exp->addDependencies(&deps);
- }
-
- if (deps.needWholeDocument) {
- // We don't swap to streaming if we need the entire document, which is likely because of
- // $$ROOT.
- return boost::none;
- }
-
- if (deps.fields.empty()) {
- // Our _id field is constant, so we should stream, but the input sort we choose is
- // irrelevant since we will output only one document.
- return BSONObj();
- }
-
- for (auto&& obj : sorts) {
- // Note that a sort order of, e.g., {a: 1, b: 1, c: 1} allows us to do a non-blocking group
- // for every permutation of group by (a, b, c), since we are guaranteed that documents with
- // the same value of (a, b, c) will be consecutive in the input stream, no matter what our
- // _id is.
- auto fieldNames = obj.getFieldNames<std::set<std::string>>();
- if (fieldNames == deps.fields) {
- return obj;
- }
- }
-
- return boost::none;
-}
-
-BSONObjSet DocumentSourceGroup::getOutputSorts() {
- if (!_initialized) {
- initialize(); // Note this might not finish initializing, but that's OK. We just want to
- // do some initialization to try to determine if we are streaming or spilled.
- // False negatives are OK.
- }
-
- if (!(_streaming || _spilled)) {
- return SimpleBSONObjComparator::kInstance.makeBSONObjSet();
- }
-
- BSONObjBuilder sortOrder;
-
- if (_idFieldNames.empty()) {
- if (_spilled) {
- sortOrder.append("_id", 1);
- } else {
- // We have an expression like {_id: "$a"}. Check if this is a FieldPath, and if it is,
- // get the sort order out of it.
- if (auto obj = dynamic_cast<ExpressionFieldPath*>(_idExpressions[0].get())) {
- FieldPath _idSort = obj->getFieldPath();
-
- sortOrder.append(
- "_id",
- _inputSort.getIntField(_idSort.getFieldName(_idSort.getPathLength() - 1)));
- }
- }
- } else if (_streaming) {
- // At this point, we know that _streaming is true, so _id must have only contained
- // ExpressionObjects, ExpressionConstants or ExpressionFieldPaths. We now process each
- // '_idExpression'.
-
- // We populate 'fieldMap' such that each key is a field the input is sorted by, and the
- // value is where that input field is located within the _id document. For example, if our
- // _id object is {_id: {x: {y: "$a.b"}}}, 'fieldMap' would be: {'a.b': '_id.x.y'}.
- StringMap<std::string> fieldMap;
- for (size_t i = 0; i < _idFieldNames.size(); i++) {
- intrusive_ptr<Expression> exp = _idExpressions[i];
- if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) {
- // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}.
- getFieldPathMap(obj, "_id." + _idFieldNames[i], &fieldMap);
- } else if (auto fieldPath = dynamic_cast<ExpressionFieldPath*>(exp.get())) {
- FieldPath _idSort = fieldPath->getFieldPath();
- fieldMap[_idSort.getFieldName(_idSort.getPathLength() - 1)] =
- "_id." + _idFieldNames[i];
- }
- }
-
- // Because the order of '_inputSort' is important, we go through each field we are sorted on
- // and append it to the BSONObjBuilder in order.
- for (BSONElement sortField : _inputSort) {
- std::string sortString = sortField.fieldNameStringData().toString();
-
- auto itr = fieldMap.find(sortString);
-
- // If our sort order is (a, b, c), we could not have converted to a streaming $group if
- // our _id was predicated on (a, c) but not 'b'. Verify that this is true.
- invariant(itr != fieldMap.end());
-
- sortOrder.append(itr->second, _inputSort.getIntField(sortString));
- }
- } else {
- // We are blocking and have spilled to disk.
- std::vector<std::string> outputSort;
- for (size_t i = 0; i < _idFieldNames.size(); i++) {
- intrusive_ptr<Expression> exp = _idExpressions[i];
- if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) {
- // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}.
- getFieldPathListForSpilled(obj, "_id." + _idFieldNames[i], &outputSort);
- } else {
- outputSort.push_back("_id." + _idFieldNames[i]);
- }
- }
- for (auto&& field : outputSort) {
- sortOrder.append(field, 1);
- }
- }
-
- return allPrefixes(sortOrder.obj());
-}
-
-
Value DocumentSourceGroup::computeId(const Document& root) {
// If only one expression, return result directly
if (_idExpressions.size() == 1) {