diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_graph_lookup.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.cpp | 494 |
1 files changed, 494 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp new file mode 100644 index 00000000000..f2821b9107c --- /dev/null +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -0,0 +1,494 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "document_source.h" + +#include "mongo/base/init.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::unique_ptr; + +REGISTER_DOCUMENT_SOURCE(graphLookup, DocumentSourceGraphLookUp::createFromBson); + +const char* DocumentSourceGraphLookUp::getSourceName() const { + return "$graphLookup"; +} + +boost::optional<Document> DocumentSourceGraphLookUp::getNext() { + pExpCtx->checkForInterrupt(); + + uassert( + 40106, "from collection must have a unique _id index", _mongod->hasUniqueIdIndex(_from)); + + if (_unwind) { + return getNextUnwound(); + } + + // We aren't handling a $unwind, process the input document normally. + if (!(_input = pSource->getNext())) { + dispose(); + return boost::none; + } + + performSearch(); + + std::vector<Value> results; + while (!_visited.empty()) { + // Remove elements one at a time to avoid consuming more memory. + auto it = _visited.begin(); + results.push_back(Value(it->second)); + _visited.erase(it); + } + + MutableDocument output(*_input); + output.setNestedField(_as, Value(std::move(results))); + + _visitedUsageBytes = 0; + + invariant(_visited.empty()); + + return output.freeze(); +} + +boost::optional<Document> DocumentSourceGraphLookUp::getNextUnwound() { + const boost::optional<FieldPath> indexPath((*_unwind)->indexPath()); + + // If the unwind is not preserving empty arrays, we might have to process multiple inputs before + // we get one that will produce an output. + while (true) { + if (_visited.empty()) { + // No results are left for the current input, so we should move on to the next one and + // perform a new search. + if (!(_input = pSource->getNext())) { + dispose(); + return boost::none; + } + + performSearch(); + _visitedUsageBytes = 0; + _outputIndex = 0; + } + MutableDocument unwound(*_input); + + if (_visited.empty()) { + if ((*_unwind)->preserveNullAndEmptyArrays()) { + // Since "preserveNullAndEmptyArrays" was specified, output a document even though + // we had no result. + unwound.setNestedField(_as, Value()); + if (indexPath) { + unwound.setNestedField(*indexPath, Value(BSONNULL)); + } + } else { + // $unwind would not output anything, since the '_as' field would not exist. We + // should loop until we have something to return. + continue; + } + } else { + auto it = _visited.begin(); + unwound.setNestedField(_as, Value(it->second)); + if (indexPath) { + unwound.setNestedField(*indexPath, Value(_outputIndex)); + ++_outputIndex; + } + _visited.erase(it); + } + + return unwound.freeze(); + } +} + +void DocumentSourceGraphLookUp::dispose() { + _cache.clear(); + _frontier.clear(); + _visited.clear(); + pSource->dispose(); +} + +void DocumentSourceGraphLookUp::doBreadthFirstSearch() { + long long depth = 0; + bool shouldPerformAnotherQuery; + do { + shouldPerformAnotherQuery = false; + + // Check whether each key in the frontier exists in the cache or needs to be queried. + BSONObjSet cached; + auto query = constructQuery(&cached); + + std::unordered_set<Value, Value::Hash> queried; + _frontier.swap(queried); + _frontierUsageBytes = 0; + + // Process cached values, populating '_frontier' for the next iteration of search. + while (!cached.empty()) { + auto it = cached.begin(); + shouldPerformAnotherQuery = + addToVisitedAndFrontier(*it, depth) || shouldPerformAnotherQuery; + cached.erase(it); + checkMemoryUsage(); + } + + if (query) { + // Query for all keys that were in the frontier and not in the cache, populating + // '_frontier' for the next iteration of search. + unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_from.ns(), *query); + + // Iterate the cursor. + while (cursor->more()) { + BSONObj result = cursor->nextSafe(); + shouldPerformAnotherQuery = + addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery; + addToCache(result, queried); + } + checkMemoryUsage(); + } + + ++depth; + } while (shouldPerformAnotherQuery && depth < std::numeric_limits<long long>::max() && + (!_maxDepth || depth <= *_maxDepth)); + + _frontier.clear(); + _frontierUsageBytes = 0; +} + +namespace { + +BSONObj addDepthFieldToObject(const std::string& field, long long depth, BSONObj object) { + BSONObjBuilder bob; + bob.appendElements(object); + bob.append(field, depth); + return bob.obj(); +} + +} // namespace + +bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(BSONObj result, long long depth) { + Value _id = Value(result.getField("_id")); + + if (_visited.find(_id) != _visited.end()) { + // We've already seen this object, don't repeat any work. + return false; + } + + // We have not seen this node before. If '_depthField' was specified, add the field to the + // object. + BSONObj fullObject = + _depthField ? addDepthFieldToObject(_depthField->getPath(false), depth, result) : result; + + // Add the object to our '_visited' list. + _visited[_id] = fullObject; + + // Update the size of '_visited' appropriately. + _visitedUsageBytes += _id.getApproximateSize(); + _visitedUsageBytes += static_cast<size_t>(fullObject.objsize()); + + // Add the 'connectFrom' field of 'result' into '_frontier'. If the 'connectFrom' field is an + // array, we treat it as connecting to multiple values, so we must add each element to + // '_frontier'. + BSONElementSet recurseOnValues; + result.getFieldsDotted(_connectFromField.getPath(false), recurseOnValues); + + for (auto&& elem : recurseOnValues) { + Value recurseOn = Value(elem); + if (recurseOn.isArray()) { + for (auto&& subElem : recurseOn.getArray()) { + _frontier.insert(subElem); + _frontierUsageBytes += subElem.getApproximateSize(); + } + } else if (!recurseOn.missing()) { + // Don't recurse on a missing value. + _frontier.insert(recurseOn); + _frontierUsageBytes += recurseOn.getApproximateSize(); + } + } + + // We inserted into _visited, so return true. + return true; +} + +void DocumentSourceGraphLookUp::addToCache(const BSONObj& result, + const unordered_set<Value, Value::Hash>& queried) { + BSONElementSet cacheByValues; + result.getFieldsDotted(_connectToField.getPath(false), cacheByValues); + + for (auto&& elem : cacheByValues) { + Value cacheBy(elem); + if (cacheBy.isArray()) { + for (auto&& val : cacheBy.getArray()) { + if (queried.find(val) != queried.end()) { + _cache.insert(val.getOwned(), result.getOwned()); + } + } + } else if (!cacheBy.missing() && queried.find(cacheBy) != queried.end()) { + // It is possible that 'cacheBy' is a single value, but was not queried for. For + // instance, with a connectToField of "a.b" and a document with the structure: + // {a: [{b: 1}, {b: 0}]}, this document will be retrieved by querying for "{b: 1}", but + // the outer for loop will split this into two separate cacheByValues. {b: 0} was not + // queried for, and thus, we cannot cache under it. + _cache.insert(cacheBy.getOwned(), result.getOwned()); + } + } +} + +boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* cached) { + // Add any cached values to 'cached' and remove them from '_frontier'. + for (auto it = _frontier.begin(); it != _frontier.end();) { + if (auto entry = _cache[*it]) { + for (auto&& obj : *entry) { + cached->insert(obj); + } + size_t valueSize = it->getApproximateSize(); + it = _frontier.erase(it); + + // If the cached value increased in size while in the cache, we don't want to underflow + // '_frontierUsageBytes'. + invariant(valueSize <= _frontierUsageBytes); + _frontierUsageBytes -= valueSize; + } else { + it = std::next(it); + } + } + + // Create a query of the form {_connectToField: {$in: [...]}}. + BSONObjBuilder query; + BSONObjBuilder subobj(query.subobjStart(_connectToField.getPath(false))); + BSONArrayBuilder in(subobj.subarrayStart("$in")); + + for (auto&& value : _frontier) { + in << value; + } + + in.doneFast(); + subobj.doneFast(); + + return _frontier.empty() ? boost::none : boost::optional<BSONObj>(query.obj()); +} + +void DocumentSourceGraphLookUp::performSearch() { + // Make sure _input is set before calling performSearch(). + invariant(_input); + + _variables->setRoot(*_input); + Value startingValue = _startWith->evaluateInternal(_variables.get()); + _variables->clearRoot(); + + // If _startWith evaluates to an array, treat each value as a separate starting point. + if (startingValue.isArray()) { + for (auto value : startingValue.getArray()) { + _frontier.insert(value); + _frontierUsageBytes += value.getApproximateSize(); + } + } else { + _frontier.insert(startingValue); + _frontierUsageBytes += startingValue.getApproximateSize(); + } + + doBreadthFirstSearch(); +} + +Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + // If we are not already handling an $unwind stage internally, we can combine with the following + // $unwind stage. + auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get()); + if (nextUnwind && !_unwind && nextUnwind->getUnwindPath() == _as.getPath(false)) { + _unwind = std::move(nextUnwind); + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); +} + +BSONObjSet DocumentSourceGraphLookUp::getOutputSorts() { + std::set<std::string> fields{_as.getPath(false)}; + if (_depthField) { + fields.insert(_depthField->getPath(false)); + } + if (_unwind && (*_unwind)->indexPath()) { + fields.insert((*_unwind)->indexPath()->getPath(false)); + } + + return DocumentSource::truncateSortSet(pSource->getOutputSorts(), fields); +} + +void DocumentSourceGraphLookUp::checkMemoryUsage() { + // TODO SERVER-23980: Implement spilling to disk if allowDiskUse is specified. + uassert(40099, + "$graphLookup reached maximum memory consumption", + (_visitedUsageBytes + _frontierUsageBytes) < _maxMemoryUsageBytes); + _cache.evictDownTo(_maxMemoryUsageBytes - _frontierUsageBytes - _visitedUsageBytes); +} + +void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool explain) const { + // Serialize default options. + MutableDocument spec(DOC("from" << _from.coll() << "as" << _as.getPath(false) + << "connectToField" << _connectToField.getPath(false) + << "connectFromField" << _connectFromField.getPath(false) + << "startWith" << _startWith->serialize(false))); + + // depthField is optional; serialize it if it was specified. + if (_depthField) { + spec["depthField"] = Value(_depthField->getPath(false)); + } + + if (_maxDepth) { + spec["maxDepth"] = Value(*_maxDepth); + } + + // If we are explaining, include an absorbed $unwind inside the $graphLookup specification. + if (_unwind && explain) { + const boost::optional<FieldPath> indexPath = (*_unwind)->indexPath(); + spec["unwinding"] = + Value(DOC("preserveNullAndEmptyArrays" + << (*_unwind)->preserveNullAndEmptyArrays() << "includeArrayIndex" + << (indexPath ? Value((*indexPath).getPath(false)) : Value()))); + } + + array.push_back(Value(DOC(getSourceName() << spec.freeze()))); + + // If we are not explaining, the output of this method must be parseable, so serialize our + // $unwind into a separate stage. + if (_unwind && !explain) { + (*_unwind)->serializeToArray(array); + } +} + +DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( + NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), + _from(std::move(from)), + _as(std::move(as)), + _connectFromField(std::move(connectFromField)), + _connectToField(std::move(connectToField)), + _startWith(std::move(startWith)), + _depthField(depthField), + _maxDepth(maxDepth) {} + +intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + NamespaceString from; + std::string as; + boost::intrusive_ptr<Expression> startWith; + std::string connectFromField; + std::string connectToField; + boost::optional<FieldPath> depthField; + boost::optional<long long> maxDepth; + + VariablesIdGenerator idGenerator; + VariablesParseState vps(&idGenerator); + + for (auto&& argument : elem.Obj()) { + const auto argName = argument.fieldNameStringData(); + + if (argName == "startWith") { + startWith = Expression::parseOperand(argument, vps); + continue; + } else if (argName == "maxDepth") { + uassert(40100, + str::stream() << "maxDepth must be numeric, found type: " + << typeName(argument.type()), + argument.isNumber()); + maxDepth = argument.safeNumberLong(); + uassert( + 40101, + str::stream() << "maxDepth requires a nonnegative argument, found: " << *maxDepth, + *maxDepth >= 0); + uassert( + 40102, + str::stream() << "maxDepth could not be represented as a long long: " << *maxDepth, + *maxDepth == argument.number()); + continue; + } + + if (argName == "from" || argName == "as" || argName == "connectFromField" || + argName == "depthField" || argName == "connectToField") { + // All remaining arguments to $graphLookup are expected to be strings. + uassert(40103, + str::stream() << "expected string as argument for " << argName + << ", found: " << argument.toString(false, false), + argument.type() == String); + } + + if (argName == "from") { + from = NamespaceString(expCtx->ns.db().toString() + '.' + argument.String()); + } else if (argName == "as") { + as = argument.String(); + } else if (argName == "connectFromField") { + connectFromField = argument.String(); + } else if (argName == "connectToField") { + connectToField = argument.String(); + } else if (argName == "depthField") { + depthField = boost::optional<FieldPath>(FieldPath(argument.String())); + } else { + uasserted(40104, + str::stream() + << "Unknown argument to $graphLookup: " << argument.fieldName()); + } + } + + const bool isMissingRequiredField = from.ns().empty() || as.empty() || !startWith || + connectFromField.empty() || connectToField.empty(); + + uassert(40105, + str::stream() << "$graphLookup requires 'from', 'as', 'startWith', 'connectFromField', " + << "and 'connectToField' to be specified.", + !isMissingRequiredField); + + intrusive_ptr<DocumentSourceGraphLookUp> newSource( + new DocumentSourceGraphLookUp(std::move(from), + std::move(as), + std::move(connectFromField), + std::move(connectToField), + std::move(startWith), + depthField, + maxDepth, + expCtx)); + + newSource->_variables.reset(new Variables(idGenerator.getIdCount())); + + return std::move(newSource); +} +} // namespace mongo |