summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_group.cpp
diff options
context:
space:
mode:
authorBenjamin Murphy <benjamin_murphy@me.com>2016-02-10 17:24:21 -0500
committerBenjamin Murphy <benjamin_murphy@me.com>2016-03-24 11:05:32 -0400
commitf4bbde02bab191cdba4195ec9ad73c60d4aece41 (patch)
treece812891454e6d5ed946813452685f7590b8efbf /src/mongo/db/pipeline/document_source_group.cpp
parentf40294818ce8690f1a485ca32ea52e33e137b7ea (diff)
downloadmongo-f4bbde02bab191cdba4195ec9ad73c60d4aece41.tar.gz
SERVER-4507 Group stages now take advantage of sorted input sequences.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp435
1 files changed, 357 insertions, 78 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 5a98ccbabec..1d33a337024 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -28,7 +28,6 @@
#include "mongo/platform/basic.h"
-
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
@@ -53,82 +52,125 @@ const char* DocumentSourceGroup::getSourceName() const {
boost::optional<Document> DocumentSourceGroup::getNext() {
pExpCtx->checkForInterrupt();
- if (!populated)
- populate();
+ if (!_initialized)
+ initialize();
- if (_spilled) {
- if (!_sorterIterator)
- return boost::none;
+ for (auto&& accum : _currentAccumulators) {
+ accum->reset(); // Prep accumulators for a new group.
+ }
- const size_t numAccumulators = vpAccumulatorFactory.size();
- for (size_t i = 0; i < numAccumulators; i++) {
- _currentAccumulators[i]->reset(); // prep accumulators for a new group
- }
+ if (_spilled) {
+ return getNextSpilled();
+ } else if (_streaming) {
+ return getNextStreaming();
+ } else {
+ return getNextStandard();
+ }
+}
- _currentId = _firstPartOfNextGroup.first;
- while (_currentId == _firstPartOfNextGroup.first) {
- // Inside of this loop, _firstPartOfNextGroup is the current data being processed.
- // At loop exit, it is the first value to be processed in the next group.
-
- switch (numAccumulators) { // mirrors switch in spill()
- case 0: // no Accumulators so no Values
- break;
-
- case 1: // single accumulators serialize as a single Value
- _currentAccumulators[0]->process(_firstPartOfNextGroup.second,
- /*merging=*/true);
- break;
-
- default: { // multiple accumulators serialize as an array
- const vector<Value>& accumulatorStates =
- _firstPartOfNextGroup.second.getArray();
- for (size_t i = 0; i < numAccumulators; i++) {
- _currentAccumulators[i]->process(accumulatorStates[i],
- /*merging=*/true);
- }
- break;
- }
- }
+boost::optional<Document> DocumentSourceGroup::getNextSpilled() {
+ // We aren't streaming, and we have spilled to disk.
+ if (!_sorterIterator)
+ return boost::none;
- if (!_sorterIterator->more()) {
- dispose();
+ _currentId = _firstPartOfNextGroup.first;
+ const size_t numAccumulators = vpAccumulatorFactory.size();
+ while (_currentId == _firstPartOfNextGroup.first) {
+ // Inside of this loop, _firstPartOfNextGroup is the current data being processed.
+ // At loop exit, it is the first value to be processed in the next group.
+ switch (numAccumulators) { // mirrors switch in spill()
+ case 1: // Single accumulators serialize as a single Value.
+ _currentAccumulators[0]->process(_firstPartOfNextGroup.second, true);
+ case 0: // No accumulators so no Values.
break;
+ default: { // Multiple accumulators serialize as an array of Values.
+ const vector<Value>& accumulatorStates = _firstPartOfNextGroup.second.getArray();
+ for (size_t i = 0; i < numAccumulators; i++) {
+ _currentAccumulators[i]->process(accumulatorStates[i], true);
+ }
}
+ }
- _firstPartOfNextGroup = _sorterIterator->next();
+ if (!_sorterIterator->more()) {
+ dispose();
+ break;
}
- return makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard);
+ _firstPartOfNextGroup = _sorterIterator->next();
+ }
- } else {
- if (groups.empty())
- return boost::none;
+ return makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard);
+}
- Document out =
- makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->inShard);
+boost::optional<Document> DocumentSourceGroup::getNextStandard() {
+ // Not spilled, and not streaming.
+ if (groups.empty())
+ return boost::none;
- if (++groupsIterator == groups.end())
- dispose();
+ Document out = makeDocument(groupsIterator->first, groupsIterator->second, pExpCtx->inShard);
+
+ if (++groupsIterator == groups.end())
+ dispose();
+
+ return out;
+}
- return out;
+boost::optional<Document> DocumentSourceGroup::getNextStreaming() {
+ // Streaming optimization is active.
+ if (!_firstDocOfNextGroup) {
+ dispose();
+ return boost::none;
}
+
+ Value id;
+ do {
+ // Add to the current accumulator(s).
+ for (size_t i = 0; i < _currentAccumulators.size(); i++) {
+ _currentAccumulators[i]->process(vpExpression[i]->evaluate(_variables.get()),
+ _doingMerge);
+ }
+
+ // Release our references to the previous input document before asking for the next. This
+ // makes operations like $unwind more efficient.
+ _variables->clearRoot();
+ _firstDocOfNextGroup = {};
+
+ // Retrieve the next document.
+ _firstDocOfNextGroup = pSource->getNext();
+ if (!_firstDocOfNextGroup) {
+ break;
+ }
+
+ _variables->setRoot(*_firstDocOfNextGroup);
+
+ // Compute the id. If it does not match _currentId, we will exit the loop, leaving
+ // _firstDocOfNextGroup set for the next time getNext() is called.
+ id = computeId(_variables.get());
+ } while (_currentId == id);
+
+ Document out = makeDocument(_currentId, _currentAccumulators, pExpCtx->inShard);
+ _currentId = std::move(id);
+
+ return out;
}
void DocumentSourceGroup::dispose() {
- // free our resources
+ // Free our resources.
GroupsMap().swap(groups);
_sorterIterator.reset();
- // make us look done
+ // Make us look done.
groupsIterator = groups.end();
- // free our source's resources
+ _firstDocOfNextGroup = boost::none;
+
+ // Free our source's resources.
pSource->dispose();
}
intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() {
- // TODO if all _idExpressions are ExpressionConstants after optimization, then we know there
- // will only be one group. We should take advantage of that to avoid going through the hash
+ // TODO: If all _idExpressions are ExpressionConstants after optimization, then we know there
+ // will be only one group. We should take advantage of that to avoid going through the hash
// table.
for (size_t i = 0; i < _idExpressions.size(); i++) {
_idExpressions[i] = _idExpressions[i]->optimize();
@@ -144,12 +186,12 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::optimize() {
Value DocumentSourceGroup::serialize(bool explain) const {
MutableDocument insides;
- // add the _id
+ // Add the _id.
if (_idFieldNames.empty()) {
invariant(_idExpressions.size() == 1);
insides["_id"] = _idExpressions[0]->serialize(explain);
} else {
- // decomposed document case
+ // Decomposed document case.
invariant(_idExpressions.size() == _idFieldNames.size());
MutableDocument md;
for (size_t i = 0; i < _idExpressions.size(); i++) {
@@ -158,7 +200,7 @@ Value DocumentSourceGroup::serialize(bool explain) const {
insides["_id"] = md.freezeToValue();
}
- // add the remaining fields
+ // Add the remaining fields.
const size_t n = vFieldName.size();
for (size_t i = 0; i < n; ++i) {
intrusive_ptr<Accumulator> accum = vpAccumulatorFactory[i]();
@@ -169,10 +211,12 @@ Value DocumentSourceGroup::serialize(bool explain) const {
if (_doingMerge) {
// This makes the output unparsable (with error) on pre 2.6 shards, but it will never
// be sent to old shards when this flag is true since they can't do a merge anyway.
-
insides["$doingMerge"] = Value(true);
}
+ if (explain && findRelevantInputSort()) {
+ return Value(DOC("$streamingGroup" << insides.freeze()));
+ }
return Value(DOC(getSourceName() << insides.freeze()));
}
@@ -193,17 +237,18 @@ DocumentSource::GetDepsReturn DocumentSourceGroup::getDependencies(DepsTracker*
intrusive_ptr<DocumentSourceGroup> DocumentSourceGroup::create(
const intrusive_ptr<ExpressionContext>& pExpCtx) {
- intrusive_ptr<DocumentSourceGroup> pSource(new DocumentSourceGroup(pExpCtx));
- return pSource;
+ return new DocumentSourceGroup(pExpCtx);
}
DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx),
- populated(false),
_doingMerge(false),
+ _maxMemoryUsageBytes(100 * 1024 * 1024),
+ _inputSort(BSONObj()),
+ _streaming(false),
+ _initialized(false),
_spilled(false),
- _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter),
- _maxMemoryUsageBytes(100 * 1024 * 1024) {}
+ _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inRouter) {}
void DocumentSourceGroup::addAccumulator(const std::string& fieldName,
Accumulator::Factory accumulatorFactory,
@@ -296,6 +341,9 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson(
}
namespace {
+
+using GroupsMap = DocumentSourceGroup::GroupsMap;
+
class SorterComparator {
public:
typedef pair<Value, Value> Data;
@@ -303,10 +351,107 @@ public:
return Value::compare(lhs.first, rhs.first);
}
};
+
+class SpillSTLComparator {
+public:
+ bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const {
+ return Value::compare(lhs->first, rhs->first) < 0;
+ }
+};
+
+bool containsOnlyFieldPathsAndConstants(ExpressionObject* expressionObj) {
+ for (auto&& it : expressionObj->getChildExpressions()) {
+ const intrusive_ptr<Expression>& childExp = it.second;
+ if (dynamic_cast<ExpressionFieldPath*>(childExp.get())) {
+ continue;
+ } else if (dynamic_cast<ExpressionConstant*>(childExp.get())) {
+ continue;
+ } else if (auto expObj = dynamic_cast<ExpressionObject*>(childExp.get())) {
+ if (!containsOnlyFieldPathsAndConstants(expObj)) {
+ // A nested expression was not a FieldPath or a constant.
+ return false;
+ }
+ } else {
+ // expressionObj was something other than a FieldPath, a constant, or a nested object.
+ return false;
+ }
+ }
+ return true;
+}
+
+void getFieldPathMap(ExpressionObject* expressionObj,
+ std::string prefix,
+ StringMap<std::string>* fields) {
+ // Given an expression with only constant and FieldPath leaf nodes, such as {x: {y: "$a.b"}},
+ // attempt to compute a map from each FieldPath leaf to the path of that leaf. In the example,
+ // this method would return: {"a.b" : "x.y"}.
+
+ for (auto&& it : expressionObj->getChildExpressions()) {
+ intrusive_ptr<Expression> childExp = it.second;
+ ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get());
+ ExpressionFieldPath* expPath = dynamic_cast<ExpressionFieldPath*>(childExp.get());
+
+ std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first;
+
+ if (expObj) {
+ getFieldPathMap(expObj, newPrefix, fields);
+ } else if (expPath) {
+ (*fields)[expPath->getFieldPath().tail().getPath(false)] = newPrefix;
+ }
+ }
+}
+
+void getFieldPathListForSpilled(ExpressionObject* expressionObj,
+ std::string prefix,
+ std::vector<std::string>* fields) {
+ // Given an expression, attempt to compute a vector of strings, each representing the path
+ // through the object to a leaf. For example, for the expression represented by
+ // {x: 2, y: {z: "$a.b"}}, the output would be ["x", "y.z"].
+ for (auto&& it : expressionObj->getChildExpressions()) {
+ intrusive_ptr<Expression> childExp = it.second;
+ ExpressionObject* expObj = dynamic_cast<ExpressionObject*>(childExp.get());
+
+ std::string newPrefix = prefix.empty() ? it.first : prefix + "." + it.first;
+
+ if (expObj) {
+ getFieldPathListForSpilled(expObj, newPrefix, fields);
+ } else {
+ fields->push_back(newPrefix);
+ }
+ }
}
+} // namespace
-void DocumentSourceGroup::populate() {
+void DocumentSourceGroup::initialize() {
+ _initialized = true;
const size_t numAccumulators = vpAccumulatorFactory.size();
+
+ boost::optional<BSONObj> inputSort = findRelevantInputSort();
+ if (inputSort) {
+ // We can convert to streaming.
+ _streaming = true;
+ _inputSort = *inputSort;
+
+ // Set up accumulators.
+ _currentAccumulators.reserve(numAccumulators);
+ for (size_t i = 0; i < numAccumulators; i++) {
+ _currentAccumulators.push_back(vpAccumulatorFactory[i]());
+ }
+
+ // We only need to load the first document.
+ _firstDocOfNextGroup = pSource->getNext();
+
+ if (!_firstDocOfNextGroup) {
+ return;
+ }
+
+ _variables->setRoot(*_firstDocOfNextGroup);
+
+ // Compute the _id value.
+ _currentId = computeId(_variables.get());
+ return;
+ }
+
dassert(numAccumulators == vpExpression.size());
// pushed to on spill()
@@ -329,10 +474,6 @@ void DocumentSourceGroup::populate() {
/* get the _id value */
Value id = computeId(_variables.get());
- /* treat missing values the same as NULL SERVER-4674 */
- if (id.missing())
- id = Value(BSONNULL);
-
/*
Look for the _id value in the map; if it's not there, add a
new entry with a blank accumulator.
@@ -406,17 +547,8 @@ void DocumentSourceGroup::populate() {
// start the group iterator
groupsIterator = groups.begin();
}
-
- populated = true;
}
-class DocumentSourceGroup::SpillSTLComparator {
-public:
- bool operator()(const GroupsMap::value_type* lhs, const GroupsMap::value_type* rhs) const {
- return Value::compare(lhs->first, rhs->first) < 0;
- }
-};
-
shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
vector<const GroupsMap::value_type*> ptrs; // using pointers to speed sorting
ptrs.reserve(groups.size());
@@ -457,6 +589,151 @@ shared_ptr<Sorter<Value, Value>::Iterator> DocumentSourceGroup::spill() {
return shared_ptr<Sorter<Value, Value>::Iterator>(writer.done());
}
+boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const {
+ if (true) {
+ // Until streaming $group correctly handles nullish values, the streaming behavior is
+ // disabled. See SERVER-23318.
+ return boost::none;
+ }
+
+ if (!pSource) {
+ // Sometimes when performing an explain, or using $group as the merge point, 'pSource' will
+ // not be set.
+ return boost::none;
+ }
+
+ BSONObjSet sorts = pSource->getOutputSorts();
+
+ // 'sorts' is a BSONObjSet. We need to check if our group pattern is compatible with one of the
+ // input sort patterns.
+
+ // We will only attempt to take advantage of a sorted input stream if the _id given to the
+ // $group contained only FieldPaths or constants. Determine if this is the case, and extract
+ // those FieldPaths if it is.
+ DepsTracker deps;
+ for (auto&& exp : _idExpressions) {
+ if (dynamic_cast<ExpressionConstant*>(exp.get())) {
+ continue;
+ }
+ ExpressionObject* obj;
+ if ((obj = dynamic_cast<ExpressionObject*>(exp.get()))) {
+ // We can only perform an optimization if there are no operators in the _id expression.
+ if (!containsOnlyFieldPathsAndConstants(obj)) {
+ return boost::none;
+ }
+ } else if (!dynamic_cast<ExpressionFieldPath*>(exp.get())) {
+ return boost::none;
+ }
+ exp->addDependencies(&deps);
+ }
+
+ if (deps.needWholeDocument) {
+ // We don't swap to streaming if we need the entire document, which is likely because of
+ // $$ROOT.
+ return boost::none;
+ }
+
+ if (deps.fields.empty()) {
+ // Our _id field is constant, so we should stream, but the input sort we choose is
+ // irrelevant since we will output only one document.
+ return BSONObj();
+ }
+
+ for (auto&& obj : sorts) {
+ // Note that a sort order of, e.g., {a: 1, b: 1, c: 1} allows us to do a non-blocking group
+ // for every permutation of group by (a, b, c), since we are guaranteed that documents with
+ // the same value of (a, b, c) will be consecutive in the input stream, no matter what our
+ // _id is.
+ std::set<std::string> fieldNames;
+ obj.getFieldNames(fieldNames);
+ if (fieldNames == deps.fields) {
+ return obj;
+ }
+ }
+
+ return boost::none;
+}
+
+BSONObjSet DocumentSourceGroup::getOutputSorts() {
+ if (!_initialized) {
+ initialize();
+ }
+
+ if (!(_streaming || _spilled)) {
+ return BSONObjSet();
+ }
+
+ BSONObjBuilder sortOrder;
+
+ if (_idFieldNames.empty()) {
+ if (_spilled) {
+ sortOrder.append("_id", 1);
+ } else {
+ // We have an expression like {_id: "$a"}. Check if this is a FieldPath, and if it is,
+ // get the sort order out of it.
+ if (auto obj = dynamic_cast<ExpressionFieldPath*>(_idExpressions[0].get())) {
+ FieldPath _idSort = obj->getFieldPath();
+
+ sortOrder.append(
+ "_id",
+ _inputSort.getIntField(_idSort.getFieldName(_idSort.getPathLength() - 1)));
+ }
+ }
+ } else if (_streaming) {
+ // At this point, we know that _streaming is true, so _id must have only contained
+ // ExpressionObjects, ExpressionConstants or ExpressionFieldPaths. We now process each
+ // '_idExpression'.
+
+ // We populate 'fieldMap' such that each key is a field the input is sorted by, and the
+ // value is where that input field is located within the _id document. For example, if our
+ // _id object is {_id: {x: {y: "$a.b"}}}, 'fieldMap' would be: {'a.b': '_id.x.y'}.
+ StringMap<std::string> fieldMap;
+ for (size_t i = 0; i < _idFieldNames.size(); i++) {
+ intrusive_ptr<Expression> exp = _idExpressions[i];
+ if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) {
+ // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}.
+ getFieldPathMap(obj, "_id." + _idFieldNames[i], &fieldMap);
+ } else if (auto fieldPath = dynamic_cast<ExpressionFieldPath*>(exp.get())) {
+ FieldPath _idSort = fieldPath->getFieldPath();
+ fieldMap[_idSort.getFieldName(_idSort.getPathLength() - 1)] =
+ "_id." + _idFieldNames[i];
+ }
+ }
+
+ // Because the order of '_inputSort' is important, we go through each field we are sorted on
+ // and append it to the BSONObjBuilder in order.
+ for (BSONElement sortField : _inputSort) {
+ std::string sortString = sortField.fieldNameStringData().toString();
+
+ auto itr = fieldMap.find(sortString);
+
+ // If our sort order is (a, b, c), we could not have converted to a streaming $group if
+ // our _id was predicated on (a, c) but not 'b'. Verify that this is true.
+ invariant(itr != fieldMap.end());
+
+ sortOrder.append(itr->second, _inputSort.getIntField(sortString));
+ }
+ } else {
+ // We are blocking and have spilled to disk.
+ std::vector<std::string> outputSort;
+ for (size_t i = 0; i < _idFieldNames.size(); i++) {
+ intrusive_ptr<Expression> exp = _idExpressions[i];
+ if (auto obj = dynamic_cast<ExpressionObject*>(exp.get())) {
+ // _id is an object containing a nested document, such as: {_id: {x: {y: "$b"}}}.
+ getFieldPathListForSpilled(obj, "_id." + _idFieldNames[i], &outputSort);
+ } else {
+ outputSort.push_back("_id." + _idFieldNames[i]);
+ }
+ }
+ for (auto&& field : outputSort) {
+ sortOrder.append(field, 1);
+ }
+ }
+
+ return allPrefixes(sortOrder.obj());
+}
+
+
void DocumentSourceGroup::parseIdExpression(BSONElement groupField,
const VariablesParseState& vps) {
if (groupField.type() == Object && !groupField.Obj().isEmpty()) {
@@ -469,7 +746,7 @@ void DocumentSourceGroup::parseIdExpression(BSONElement groupField,
_idExpressions.push_back(Expression::parseObject(idKeyObj, &oCtx, vps));
} else {
// grouping on an "artificial" object. Rather than create the object for each input
- // in populate(), instead group on the output of the raw expressions. The artificial
+ // in initialize(), instead group on the output of the raw expressions. The artificial
// object will be created at the end in makeDocument() while outputting results.
BSONForEach(field, idKeyObj) {
uassert(17390,
@@ -490,9 +767,11 @@ void DocumentSourceGroup::parseIdExpression(BSONElement groupField,
}
Value DocumentSourceGroup::computeId(Variables* vars) {
- // If only one expression return result directly
- if (_idExpressions.size() == 1)
- return _idExpressions[0]->evaluate(vars);
+ // If only one expression, return result directly
+ if (_idExpressions.size() == 1) {
+ Value retValue = _idExpressions[0]->evaluate(vars);
+ return retValue.missing() ? Value(BSONNULL) : std::move(retValue);
+ }
// Multiple expressions get results wrapped in a vector
vector<Value> vals;