/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "document_source.h" #include "mongo/base/init.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/jsobj.h" #include "mongo/db/matcher/extensions_callback_disallow_extensions.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/stdx/memory.h" namespace mongo { using boost::intrusive_ptr; using std::unique_ptr; namespace dps = ::mongo::dotted_path_support; REGISTER_DOCUMENT_SOURCE(graphLookup, DocumentSourceGraphLookUp::createFromBson); const char* DocumentSourceGraphLookUp::getSourceName() const { return "$graphLookup"; } DocumentSource::GetNextResult DocumentSourceGraphLookUp::getNext() { pExpCtx->checkForInterrupt(); if (_unwind) { return getNextUnwound(); } // We aren't handling a $unwind, process the input document normally. auto input = pSource->getNext(); if (!input.isAdvanced()) { return input; } _input = input.releaseDocument(); performSearch(); std::vector results; while (!_visited.empty()) { // Remove elements one at a time to avoid consuming more memory. auto it = _visited.begin(); results.push_back(Value(it->second)); _visited.erase(it); } MutableDocument output(*_input); output.setNestedField(_as, Value(std::move(results))); _visitedUsageBytes = 0; invariant(_visited.empty()); return output.freeze(); } DocumentSource::GetNextResult DocumentSourceGraphLookUp::getNextUnwound() { const boost::optional indexPath((*_unwind)->indexPath()); // If the unwind is not preserving empty arrays, we might have to process multiple inputs before // we get one that will produce an output. while (true) { if (_visited.empty()) { // No results are left for the current input, so we should move on to the next one and // perform a new search. auto input = pSource->getNext(); if (!input.isAdvanced()) { return input; } _input = input.releaseDocument(); performSearch(); _visitedUsageBytes = 0; _outputIndex = 0; } MutableDocument unwound(*_input); if (_visited.empty()) { if ((*_unwind)->preserveNullAndEmptyArrays()) { // Since "preserveNullAndEmptyArrays" was specified, output a document even though // we had no result. unwound.setNestedField(_as, Value()); if (indexPath) { unwound.setNestedField(*indexPath, Value(BSONNULL)); } } else { // $unwind would not output anything, since the '_as' field would not exist. We // should loop until we have something to return. continue; } } else { auto it = _visited.begin(); unwound.setNestedField(_as, Value(it->second)); if (indexPath) { unwound.setNestedField(*indexPath, Value(_outputIndex)); ++_outputIndex; } _visited.erase(it); } return unwound.freeze(); } } void DocumentSourceGraphLookUp::dispose() { _cache.clear(); _frontier->clear(); _visited.clear(); pSource->dispose(); } void DocumentSourceGraphLookUp::doBreadthFirstSearch() { long long depth = 0; bool shouldPerformAnotherQuery; do { shouldPerformAnotherQuery = false; // Check whether each key in the frontier exists in the cache or needs to be queried. BSONObjSet cached = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); auto matchStage = makeMatchStageFromFrontier(&cached); ValueUnorderedSet queried = pExpCtx->getValueComparator().makeUnorderedValueSet(); _frontier->swap(queried); _frontierUsageBytes = 0; // Process cached values, populating '_frontier' for the next iteration of search. while (!cached.empty()) { auto it = cached.begin(); shouldPerformAnotherQuery = addToVisitedAndFrontier(*it, depth) || shouldPerformAnotherQuery; cached.erase(it); checkMemoryUsage(); } if (matchStage) { // Query for all keys that were in the frontier and not in the cache, populating // '_frontier' for the next iteration of search. // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; auto pipeline = uassertStatusOK(_mongod->makePipeline(_fromPipeline, _fromExpCtx)); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() << "Documents in the '" << _from.ns() << "' namespace must contain an _id for de-duplication in $graphLookup", !(*next)["_id"].missing()); BSONObj result = next->toBson(); shouldPerformAnotherQuery = addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery; addToCache(result, queried); } checkMemoryUsage(); } ++depth; } while (shouldPerformAnotherQuery && depth < std::numeric_limits::max() && (!_maxDepth || depth <= *_maxDepth)); _frontier->clear(); _frontierUsageBytes = 0; } namespace { BSONObj addDepthFieldToObject(const std::string& field, long long depth, BSONObj object) { BSONObjBuilder bob; bob.appendElements(object); bob.append(field, depth); return bob.obj(); } } // namespace bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(BSONObj result, long long depth) { Value _id = Value(result.getField("_id")); if (_visited.find(_id) != _visited.end()) { // We've already seen this object, don't repeat any work. return false; } // We have not seen this node before. If '_depthField' was specified, add the field to the // object. BSONObj fullObject = _depthField ? addDepthFieldToObject(_depthField->fullPath(), depth, result) : result; // Add the object to our '_visited' list. _visited[_id] = fullObject; // Update the size of '_visited' appropriately. _visitedUsageBytes += _id.getApproximateSize(); _visitedUsageBytes += static_cast(fullObject.objsize()); // Add the 'connectFrom' field of 'result' into '_frontier'. If the 'connectFrom' field is an // array, we treat it as connecting to multiple values, so we must add each element to // '_frontier'. BSONElementSet recurseOnValues; dps::extractAllElementsAlongPath(result, _connectFromField.fullPath(), recurseOnValues); for (auto&& elem : recurseOnValues) { Value recurseOn = Value(elem); if (recurseOn.isArray()) { for (auto&& subElem : recurseOn.getArray()) { _frontier->insert(subElem); _frontierUsageBytes += subElem.getApproximateSize(); } } else if (!recurseOn.missing()) { // Don't recurse on a missing value. _frontier->insert(recurseOn); _frontierUsageBytes += recurseOn.getApproximateSize(); } } // We inserted into _visited, so return true. return true; } void DocumentSourceGraphLookUp::addToCache(const BSONObj& result, const ValueUnorderedSet& queried) { BSONElementSet cacheByValues; dps::extractAllElementsAlongPath(result, _connectToField.fullPath(), cacheByValues); for (auto&& elem : cacheByValues) { Value cacheBy(elem); if (cacheBy.isArray()) { for (auto&& val : cacheBy.getArray()) { if (queried.find(val) != queried.end()) { _cache.insert(val.getOwned(), result.getOwned()); } } } else if (!cacheBy.missing() && queried.find(cacheBy) != queried.end()) { // It is possible that 'cacheBy' is a single value, but was not queried for. For // instance, with a connectToField of "a.b" and a document with the structure: // {a: [{b: 1}, {b: 0}]}, this document will be retrieved by querying for "{b: 1}", but // the outer for loop will split this into two separate cacheByValues. {b: 0} was not // queried for, and thus, we cannot cache under it. _cache.insert(cacheBy.getOwned(), result.getOwned()); } } } boost::optional DocumentSourceGraphLookUp::makeMatchStageFromFrontier(BSONObjSet* cached) { // Add any cached values to 'cached' and remove them from '_frontier'. for (auto it = _frontier->begin(); it != _frontier->end();) { if (auto entry = _cache[*it]) { for (auto&& obj : *entry) { cached->insert(obj); } size_t valueSize = it->getApproximateSize(); it = _frontier->erase(it); // If the cached value increased in size while in the cache, we don't want to underflow // '_frontierUsageBytes'. invariant(valueSize <= _frontierUsageBytes); _frontierUsageBytes -= valueSize; } else { it = std::next(it); } } // Create a query of the form {$and: [_additionalFilter, {_connectToField: {$in: [...]}}]}. // // We wrap the query in a $match so that it can be parsed into a DocumentSourceMatch when // constructing a pipeline to execute. BSONObjBuilder match; { BSONObjBuilder query(match.subobjStart("$match")); { BSONArrayBuilder andObj(query.subarrayStart("$and")); if (_additionalFilter) { andObj << *_additionalFilter; } { BSONObjBuilder connectToObj(andObj.subobjStart()); { BSONObjBuilder subObj(connectToObj.subobjStart(_connectToField.fullPath())); { BSONArrayBuilder in(subObj.subarrayStart("$in")); for (auto&& value : *_frontier) { in << value; } } } } } } return _frontier->empty() ? boost::none : boost::optional(match.obj()); } void DocumentSourceGraphLookUp::performSearch() { // Make sure _input is set before calling performSearch(). invariant(_input); _variables->setRoot(*_input); Value startingValue = _startWith->evaluateInternal(_variables.get()); _variables->clearRoot(); // If _startWith evaluates to an array, treat each value as a separate starting point. if (startingValue.isArray()) { for (auto value : startingValue.getArray()) { _frontier->insert(value); _frontierUsageBytes += value.getApproximateSize(); } } else { _frontier->insert(startingValue); _frontierUsageBytes += startingValue.getApproximateSize(); } doBreadthFirstSearch(); } Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); // If we are not already handling an $unwind stage internally, we can combine with the following // $unwind stage. auto nextUnwind = dynamic_cast((*std::next(itr)).get()); if (nextUnwind && !_unwind && nextUnwind->getUnwindPath() == _as.fullPath()) { _unwind = std::move(nextUnwind); container->erase(std::next(itr)); return itr; } return std::next(itr); } BSONObjSet DocumentSourceGraphLookUp::getOutputSorts() { std::set fields{_as.fullPath()}; if (_depthField) { fields.insert(_depthField->fullPath()); } if (_unwind && (*_unwind)->indexPath()) { fields.insert((*_unwind)->indexPath()->fullPath()); } return DocumentSource::truncateSortSet(pSource->getOutputSorts(), fields); } void DocumentSourceGraphLookUp::checkMemoryUsage() { // TODO SERVER-23980: Implement spilling to disk if allowDiskUse is specified. uassert(40099, "$graphLookup reached maximum memory consumption", (_visitedUsageBytes + _frontierUsageBytes) < _maxMemoryUsageBytes); _cache.evictDownTo(_maxMemoryUsageBytes - _frontierUsageBytes - _visitedUsageBytes); } void DocumentSourceGraphLookUp::serializeToArray(std::vector& array, bool explain) const { // Serialize default options. MutableDocument spec(DOC("from" << _from.coll() << "as" << _as.fullPath() << "connectToField" << _connectToField.fullPath() << "connectFromField" << _connectFromField.fullPath() << "startWith" << _startWith->serialize(false))); // depthField is optional; serialize it if it was specified. if (_depthField) { spec["depthField"] = Value(_depthField->fullPath()); } if (_maxDepth) { spec["maxDepth"] = Value(*_maxDepth); } if (_additionalFilter) { spec["restrictSearchWithMatch"] = Value(*_additionalFilter); } // If we are explaining, include an absorbed $unwind inside the $graphLookup specification. if (_unwind && explain) { const boost::optional indexPath = (*_unwind)->indexPath(); spec["unwinding"] = Value(DOC("preserveNullAndEmptyArrays" << (*_unwind)->preserveNullAndEmptyArrays() << "includeArrayIndex" << (indexPath ? Value((*indexPath).fullPath()) : Value()))); } array.push_back(Value(DOC(getSourceName() << spec.freeze()))); // If we are not explaining, the output of this method must be parseable, so serialize our // $unwind into a separate stage. if (_unwind && !explain) { (*_unwind)->serializeToArray(array); } } void DocumentSourceGraphLookUp::doInjectExpressionContext() { auto it = pExpCtx->resolvedNamespaces.find(_from.coll()); invariant(it != pExpCtx->resolvedNamespaces.end()); const auto& resolvedNamespace = it->second; _fromExpCtx = pExpCtx->copyWith(resolvedNamespace.ns); _fromPipeline = resolvedNamespace.pipeline; // We append an additional BSONObj to '_fromPipeline' as a placeholder for the $match stage // we'll eventually construct from the input document. _fromPipeline.reserve(_fromPipeline.size() + 1); _fromPipeline.push_back(BSONObj()); _frontier = pExpCtx->getValueComparator().makeUnorderedValueSet(); _cache.setValueComparator(pExpCtx->getValueComparator()); } void DocumentSourceGraphLookUp::doDetachFromOperationContext() { _fromExpCtx->opCtx = nullptr; } void DocumentSourceGraphLookUp::doReattachToOperationContext(OperationContext* opCtx) { _fromExpCtx->opCtx = opCtx; } DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( NamespaceString from, std::string as, std::string connectFromField, std::string connectToField, boost::intrusive_ptr startWith, boost::optional additionalFilter, boost::optional depthField, boost::optional maxDepth, const boost::intrusive_ptr& expCtx) : DocumentSourceNeedsMongod(expCtx), _from(std::move(from)), _as(std::move(as)), _connectFromField(std::move(connectFromField)), _connectToField(std::move(connectToField)), _startWith(std::move(startWith)), _additionalFilter(additionalFilter), _depthField(depthField), _maxDepth(maxDepth), _visited(ValueComparator::kInstance.makeUnorderedValueMap()), _cache(expCtx->getValueComparator()) {} intrusive_ptr DocumentSourceGraphLookUp::create( const intrusive_ptr& expCtx, NamespaceString fromNs, std::string asField, std::string connectFromField, std::string connectToField, intrusive_ptr startWith, boost::optional additionalFilter, boost::optional depthField, boost::optional maxDepth) { intrusive_ptr source( new DocumentSourceGraphLookUp(std::move(fromNs), std::move(asField), std::move(connectFromField), std::move(connectToField), std::move(startWith), additionalFilter, depthField, maxDepth, expCtx)); source->_variables.reset(new Variables()); source->injectExpressionContext(expCtx); return source; } intrusive_ptr DocumentSourceGraphLookUp::createFromBson( BSONElement elem, const boost::intrusive_ptr& expCtx) { NamespaceString from; std::string as; boost::intrusive_ptr startWith; std::string connectFromField; std::string connectToField; boost::optional depthField; boost::optional maxDepth; boost::optional additionalFilter; VariablesIdGenerator idGenerator; VariablesParseState vps(&idGenerator); for (auto&& argument : elem.Obj()) { const auto argName = argument.fieldNameStringData(); if (argName == "startWith") { startWith = Expression::parseOperand(argument, vps); continue; } else if (argName == "maxDepth") { uassert(40100, str::stream() << "maxDepth must be numeric, found type: " << typeName(argument.type()), argument.isNumber()); maxDepth = argument.safeNumberLong(); uassert(40101, str::stream() << "maxDepth requires a nonnegative argument, found: " << *maxDepth, *maxDepth >= 0); uassert(40102, str::stream() << "maxDepth could not be represented as a long long: " << *maxDepth, *maxDepth == argument.number()); continue; } else if (argName == "restrictSearchWithMatch") { uassert(40185, str::stream() << "restrictSearchWithMatch must be an object, found " << typeName(argument.type()), argument.type() == Object); // We don't need to keep ahold of the MatchExpression, but we do need to ensure that // the specified object is parseable. auto parsedMatchExpression = MatchExpressionParser::parse( argument.embeddedObject(), ExtensionsCallbackDisallowExtensions(), nullptr); uassert(40186, str::stream() << "Failed to parse 'restrictSearchWithMatch' option to $graphLookup: " << parsedMatchExpression.getStatus().reason(), parsedMatchExpression.isOK()); uassert(40187, str::stream() << "Failed to parse 'restrictSearchWithMatch' option to $graphLookup: " << "$near not supported.", !QueryPlannerCommon::hasNode(parsedMatchExpression.getValue().get(), MatchExpression::GEO_NEAR)); additionalFilter = argument.embeddedObject().getOwned(); continue; } if (argName == "from" || argName == "as" || argName == "connectFromField" || argName == "depthField" || argName == "connectToField") { // All remaining arguments to $graphLookup are expected to be strings. uassert(40103, str::stream() << "expected string as argument for " << argName << ", found: " << argument.toString(false, false), argument.type() == String); } if (argName == "from") { from = NamespaceString(expCtx->ns.db().toString() + '.' + argument.String()); } else if (argName == "as") { as = argument.String(); } else if (argName == "connectFromField") { connectFromField = argument.String(); } else if (argName == "connectToField") { connectToField = argument.String(); } else if (argName == "depthField") { depthField = boost::optional(FieldPath(argument.String())); } else { uasserted(40104, str::stream() << "Unknown argument to $graphLookup: " << argument.fieldName()); } } const bool isMissingRequiredField = from.ns().empty() || as.empty() || !startWith || connectFromField.empty() || connectToField.empty(); uassert(40105, str::stream() << "$graphLookup requires 'from', 'as', 'startWith', 'connectFromField', " << "and 'connectToField' to be specified.", !isMissingRequiredField); intrusive_ptr newSource( new DocumentSourceGraphLookUp(std::move(from), std::move(as), std::move(connectFromField), std::move(connectToField), std::move(startWith), additionalFilter, depthField, maxDepth, expCtx)); newSource->_variables.reset(new Variables(idGenerator.getIdCount())); return std::move(newSource); } } // namespace mongo