diff options
author | David Storch <david.storch@10gen.com> | 2016-05-02 12:34:07 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2016-05-04 09:54:45 -0400 |
commit | 34e784dd13375bba5547d332b6aeba9a80016e63 (patch) | |
tree | 7634db01a8a579e0583216dabaf2b1edd7b65707 /src/mongo/db/pipeline | |
parent | 3e3313409d90026ed5f629e8ffb87b04cca5a524 (diff) | |
download | mongo-34e784dd13375bba5547d332b6aeba9a80016e63.tar.gz |
SERVER-23725 Aggregation now supports the graphLookup stage.
This commit is identical to a1253a94b82e65780d3aa3c4ddf92db02ec0b9d1.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 150 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.cpp | 494 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_test.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lookup_set_cache.h | 179 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lookup_set_cache_test.cpp | 154 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 55 |
9 files changed, 1118 insertions, 5 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 56161d0cac2..72609a6d0cc 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -100,6 +100,7 @@ docSourceEnv.Library( source=[ 'document_source.cpp', 'document_source_geo_near.cpp', + 'document_source_graph_lookup.cpp', 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_limit.cpp', @@ -180,3 +181,14 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/service_context_noop_init', ], ) + +env.CppUnitTest( + target='lookup_set_cache_test', + source=[ + 'lookup_set_cache_test.cpp', + ], + LIBDEPS=[ + 'document_value', + '$BUILD_DIR/mongo/base', + ] +) diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 41c22e1ecb8..33e85baa380 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -114,4 +114,38 @@ BSONObjSet DocumentSource::allPrefixes(BSONObj obj) { return out; } + +BSONObjSet DocumentSource::truncateSortSet(const BSONObjSet& sorts, + const std::set<std::string>& fields) { + BSONObjSet out; + + for (auto&& sort : sorts) { + BSONObjBuilder outputSort; + + for (auto&& key : sort) { + auto keyName = key.fieldNameStringData(); + + bool shouldAppend = true; + for (auto&& field : fields) { + if (keyName == field || keyName.startsWith(field + '.')) { + shouldAppend = false; + break; + } + } + + if (!shouldAppend) { + break; + } + + outputSort.append(key); + } + + BSONObj outSortObj = outputSort.obj(); + if (!outSortObj.isEmpty()) { + out.insert(outSortObj); + } + } + + return out; +} } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 9ee39ac93fd..99197592d23 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -30,8 +30,6 @@ #include "mongo/platform/basic.h" -#include <boost/optional.hpp> -#include <boost/intrusive_ptr.hpp> #include <deque> #include <list> #include <string> @@ -46,10 +44,11 @@ #include "mongo/db/jsobj.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/pipeline/accumulator.h" -#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/lookup_set_cache.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/sorter/sorter.h" @@ -228,6 +227,13 @@ public: */ static BSONObjSet allPrefixes(BSONObj obj); + /** + * Given a BSONObjSet, where each BSONObj represents a sort key, return the BSONObjSet that + * results from truncating each sort key before the first path that is a member of 'fields', or + * is a child of a member of 'fields'. + */ + static BSONObjSet truncateSortSet(const BSONObjSet& sorts, const std::set<std::string>& fields); + protected: /** Base constructor. @@ -317,6 +323,8 @@ public: virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; + virtual bool hasUniqueIdIndex(const NamespaceString& ns) const = 0; + // Add new methods as needed. }; @@ -1535,4 +1543,138 @@ private: long long _cursorIndex = 0; boost::optional<Document> _input; }; + +class DocumentSourceGraphLookUp final : public DocumentSource, public DocumentSourceNeedsMongod { +public: + boost::optional<Document> getNext() final; + const char* getSourceName() const final; + void dispose() final; + BSONObjSet getOutputSorts() final; + void serializeToArray(std::vector<Value>& array, bool explain = false) const final; + + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final { + _startWith->addDependencies(deps, nullptr); + return SEE_NEXT; + }; + + bool needsPrimaryShard() const final { + return true; + } + + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { + collections->push_back(_from); + } + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceGraphLookUp(NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + Value serialize(bool explain = false) const final { + // Should not be called; use serializeToArray instead. + MONGO_UNREACHABLE; + } + + /** + * Prepare the query to execute on the 'from' collection, using the contents of '_frontier'. + * + * Fills 'cached' with any values that were retrieved from the cache. + * + * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache. + * Otherwise, returns a query object. + */ + boost::optional<BSONObj> constructQuery(BSONObjSet* cached); + + /** + * If we have internalized a $unwind, getNext() dispatches to this function. + */ + boost::optional<Document> getNextUnwound(); + + /** + * Perform a breadth-first search of the 'from' collection. '_frontier' should already be + * populated with the values for the initial query. Populates '_discovered' with the result(s) + * of the query. + */ + void doBreadthFirstSearch(); + + /** + * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a + * breadth-first search. Caller should check that _input is not boost::none. + */ + void performSearch(); + + /** + * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying + * for 'queried'. + */ + void addToCache(const BSONObj& result, const unordered_set<Value, Value::Hash>& queried); + + /** + * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then + * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'. + */ + void checkMemoryUsage(); + + /** + * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier' + * with the object's 'connectTo' values. + * + * Returns whether '_visited' was updated, and thus, whether the search should recurse. + */ + bool addToVisitedAndFrontier(BSONObj result, long long depth); + + // $graphLookup options. + NamespaceString _from; + FieldPath _as; + FieldPath _connectFromField; + FieldPath _connectToField; + boost::intrusive_ptr<Expression> _startWith; + boost::optional<FieldPath> _depthField; + boost::optional<long long> _maxDepth; + + size_t _maxMemoryUsageBytes = 100 * 1024 * 1024; + + // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'. + size_t _visitedUsageBytes = 0; + size_t _frontierUsageBytes = 0; + + // Only used during the breadth-first search, tracks the set of values on the current frontier. + std::unordered_set<Value, Value::Hash> _frontier; + + // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the + // document from the foreign collection, value is the document itself. + std::unordered_map<Value, BSONObj, Value::Hash> _visited; + + // Caches query results to avoid repeating any work. This structure is maintained across calls + // to getNext(). + LookupSetCache _cache; + + // When we have internalized a $unwind, we must keep track of the input document, since we will + // need it for multiple "getNext()" calls. + boost::optional<Document> _input; + + // The variables that are in scope to be used by the '_startWith' expression. + std::unique_ptr<Variables> _variables; + + // Keep track of a $unwind that was absorbed into this stage. + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind; + + // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that + // field, tracking how many results we've returned so far for the current input document. + long long _outputIndex; +}; } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp new file mode 100644 index 00000000000..f2821b9107c --- /dev/null +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -0,0 +1,494 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "document_source.h" + +#include "mongo/base/init.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::unique_ptr; + +REGISTER_DOCUMENT_SOURCE(graphLookup, DocumentSourceGraphLookUp::createFromBson); + +const char* DocumentSourceGraphLookUp::getSourceName() const { + return "$graphLookup"; +} + +boost::optional<Document> DocumentSourceGraphLookUp::getNext() { + pExpCtx->checkForInterrupt(); + + uassert( + 40106, "from collection must have a unique _id index", _mongod->hasUniqueIdIndex(_from)); + + if (_unwind) { + return getNextUnwound(); + } + + // We aren't handling a $unwind, process the input document normally. + if (!(_input = pSource->getNext())) { + dispose(); + return boost::none; + } + + performSearch(); + + std::vector<Value> results; + while (!_visited.empty()) { + // Remove elements one at a time to avoid consuming more memory. + auto it = _visited.begin(); + results.push_back(Value(it->second)); + _visited.erase(it); + } + + MutableDocument output(*_input); + output.setNestedField(_as, Value(std::move(results))); + + _visitedUsageBytes = 0; + + invariant(_visited.empty()); + + return output.freeze(); +} + +boost::optional<Document> DocumentSourceGraphLookUp::getNextUnwound() { + const boost::optional<FieldPath> indexPath((*_unwind)->indexPath()); + + // If the unwind is not preserving empty arrays, we might have to process multiple inputs before + // we get one that will produce an output. + while (true) { + if (_visited.empty()) { + // No results are left for the current input, so we should move on to the next one and + // perform a new search. + if (!(_input = pSource->getNext())) { + dispose(); + return boost::none; + } + + performSearch(); + _visitedUsageBytes = 0; + _outputIndex = 0; + } + MutableDocument unwound(*_input); + + if (_visited.empty()) { + if ((*_unwind)->preserveNullAndEmptyArrays()) { + // Since "preserveNullAndEmptyArrays" was specified, output a document even though + // we had no result. + unwound.setNestedField(_as, Value()); + if (indexPath) { + unwound.setNestedField(*indexPath, Value(BSONNULL)); + } + } else { + // $unwind would not output anything, since the '_as' field would not exist. We + // should loop until we have something to return. + continue; + } + } else { + auto it = _visited.begin(); + unwound.setNestedField(_as, Value(it->second)); + if (indexPath) { + unwound.setNestedField(*indexPath, Value(_outputIndex)); + ++_outputIndex; + } + _visited.erase(it); + } + + return unwound.freeze(); + } +} + +void DocumentSourceGraphLookUp::dispose() { + _cache.clear(); + _frontier.clear(); + _visited.clear(); + pSource->dispose(); +} + +void DocumentSourceGraphLookUp::doBreadthFirstSearch() { + long long depth = 0; + bool shouldPerformAnotherQuery; + do { + shouldPerformAnotherQuery = false; + + // Check whether each key in the frontier exists in the cache or needs to be queried. + BSONObjSet cached; + auto query = constructQuery(&cached); + + std::unordered_set<Value, Value::Hash> queried; + _frontier.swap(queried); + _frontierUsageBytes = 0; + + // Process cached values, populating '_frontier' for the next iteration of search. + while (!cached.empty()) { + auto it = cached.begin(); + shouldPerformAnotherQuery = + addToVisitedAndFrontier(*it, depth) || shouldPerformAnotherQuery; + cached.erase(it); + checkMemoryUsage(); + } + + if (query) { + // Query for all keys that were in the frontier and not in the cache, populating + // '_frontier' for the next iteration of search. + unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_from.ns(), *query); + + // Iterate the cursor. + while (cursor->more()) { + BSONObj result = cursor->nextSafe(); + shouldPerformAnotherQuery = + addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery; + addToCache(result, queried); + } + checkMemoryUsage(); + } + + ++depth; + } while (shouldPerformAnotherQuery && depth < std::numeric_limits<long long>::max() && + (!_maxDepth || depth <= *_maxDepth)); + + _frontier.clear(); + _frontierUsageBytes = 0; +} + +namespace { + +BSONObj addDepthFieldToObject(const std::string& field, long long depth, BSONObj object) { + BSONObjBuilder bob; + bob.appendElements(object); + bob.append(field, depth); + return bob.obj(); +} + +} // namespace + +bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(BSONObj result, long long depth) { + Value _id = Value(result.getField("_id")); + + if (_visited.find(_id) != _visited.end()) { + // We've already seen this object, don't repeat any work. + return false; + } + + // We have not seen this node before. If '_depthField' was specified, add the field to the + // object. + BSONObj fullObject = + _depthField ? addDepthFieldToObject(_depthField->getPath(false), depth, result) : result; + + // Add the object to our '_visited' list. + _visited[_id] = fullObject; + + // Update the size of '_visited' appropriately. + _visitedUsageBytes += _id.getApproximateSize(); + _visitedUsageBytes += static_cast<size_t>(fullObject.objsize()); + + // Add the 'connectFrom' field of 'result' into '_frontier'. If the 'connectFrom' field is an + // array, we treat it as connecting to multiple values, so we must add each element to + // '_frontier'. + BSONElementSet recurseOnValues; + result.getFieldsDotted(_connectFromField.getPath(false), recurseOnValues); + + for (auto&& elem : recurseOnValues) { + Value recurseOn = Value(elem); + if (recurseOn.isArray()) { + for (auto&& subElem : recurseOn.getArray()) { + _frontier.insert(subElem); + _frontierUsageBytes += subElem.getApproximateSize(); + } + } else if (!recurseOn.missing()) { + // Don't recurse on a missing value. + _frontier.insert(recurseOn); + _frontierUsageBytes += recurseOn.getApproximateSize(); + } + } + + // We inserted into _visited, so return true. + return true; +} + +void DocumentSourceGraphLookUp::addToCache(const BSONObj& result, + const unordered_set<Value, Value::Hash>& queried) { + BSONElementSet cacheByValues; + result.getFieldsDotted(_connectToField.getPath(false), cacheByValues); + + for (auto&& elem : cacheByValues) { + Value cacheBy(elem); + if (cacheBy.isArray()) { + for (auto&& val : cacheBy.getArray()) { + if (queried.find(val) != queried.end()) { + _cache.insert(val.getOwned(), result.getOwned()); + } + } + } else if (!cacheBy.missing() && queried.find(cacheBy) != queried.end()) { + // It is possible that 'cacheBy' is a single value, but was not queried for. For + // instance, with a connectToField of "a.b" and a document with the structure: + // {a: [{b: 1}, {b: 0}]}, this document will be retrieved by querying for "{b: 1}", but + // the outer for loop will split this into two separate cacheByValues. {b: 0} was not + // queried for, and thus, we cannot cache under it. + _cache.insert(cacheBy.getOwned(), result.getOwned()); + } + } +} + +boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* cached) { + // Add any cached values to 'cached' and remove them from '_frontier'. + for (auto it = _frontier.begin(); it != _frontier.end();) { + if (auto entry = _cache[*it]) { + for (auto&& obj : *entry) { + cached->insert(obj); + } + size_t valueSize = it->getApproximateSize(); + it = _frontier.erase(it); + + // If the cached value increased in size while in the cache, we don't want to underflow + // '_frontierUsageBytes'. + invariant(valueSize <= _frontierUsageBytes); + _frontierUsageBytes -= valueSize; + } else { + it = std::next(it); + } + } + + // Create a query of the form {_connectToField: {$in: [...]}}. + BSONObjBuilder query; + BSONObjBuilder subobj(query.subobjStart(_connectToField.getPath(false))); + BSONArrayBuilder in(subobj.subarrayStart("$in")); + + for (auto&& value : _frontier) { + in << value; + } + + in.doneFast(); + subobj.doneFast(); + + return _frontier.empty() ? boost::none : boost::optional<BSONObj>(query.obj()); +} + +void DocumentSourceGraphLookUp::performSearch() { + // Make sure _input is set before calling performSearch(). + invariant(_input); + + _variables->setRoot(*_input); + Value startingValue = _startWith->evaluateInternal(_variables.get()); + _variables->clearRoot(); + + // If _startWith evaluates to an array, treat each value as a separate starting point. + if (startingValue.isArray()) { + for (auto value : startingValue.getArray()) { + _frontier.insert(value); + _frontierUsageBytes += value.getApproximateSize(); + } + } else { + _frontier.insert(startingValue); + _frontierUsageBytes += startingValue.getApproximateSize(); + } + + doBreadthFirstSearch(); +} + +Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + // If we are not already handling an $unwind stage internally, we can combine with the following + // $unwind stage. + auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get()); + if (nextUnwind && !_unwind && nextUnwind->getUnwindPath() == _as.getPath(false)) { + _unwind = std::move(nextUnwind); + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); +} + +BSONObjSet DocumentSourceGraphLookUp::getOutputSorts() { + std::set<std::string> fields{_as.getPath(false)}; + if (_depthField) { + fields.insert(_depthField->getPath(false)); + } + if (_unwind && (*_unwind)->indexPath()) { + fields.insert((*_unwind)->indexPath()->getPath(false)); + } + + return DocumentSource::truncateSortSet(pSource->getOutputSorts(), fields); +} + +void DocumentSourceGraphLookUp::checkMemoryUsage() { + // TODO SERVER-23980: Implement spilling to disk if allowDiskUse is specified. + uassert(40099, + "$graphLookup reached maximum memory consumption", + (_visitedUsageBytes + _frontierUsageBytes) < _maxMemoryUsageBytes); + _cache.evictDownTo(_maxMemoryUsageBytes - _frontierUsageBytes - _visitedUsageBytes); +} + +void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool explain) const { + // Serialize default options. + MutableDocument spec(DOC("from" << _from.coll() << "as" << _as.getPath(false) + << "connectToField" << _connectToField.getPath(false) + << "connectFromField" << _connectFromField.getPath(false) + << "startWith" << _startWith->serialize(false))); + + // depthField is optional; serialize it if it was specified. + if (_depthField) { + spec["depthField"] = Value(_depthField->getPath(false)); + } + + if (_maxDepth) { + spec["maxDepth"] = Value(*_maxDepth); + } + + // If we are explaining, include an absorbed $unwind inside the $graphLookup specification. + if (_unwind && explain) { + const boost::optional<FieldPath> indexPath = (*_unwind)->indexPath(); + spec["unwinding"] = + Value(DOC("preserveNullAndEmptyArrays" + << (*_unwind)->preserveNullAndEmptyArrays() << "includeArrayIndex" + << (indexPath ? Value((*indexPath).getPath(false)) : Value()))); + } + + array.push_back(Value(DOC(getSourceName() << spec.freeze()))); + + // If we are not explaining, the output of this method must be parseable, so serialize our + // $unwind into a separate stage. + if (_unwind && !explain) { + (*_unwind)->serializeToArray(array); + } +} + +DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( + NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), + _from(std::move(from)), + _as(std::move(as)), + _connectFromField(std::move(connectFromField)), + _connectToField(std::move(connectToField)), + _startWith(std::move(startWith)), + _depthField(depthField), + _maxDepth(maxDepth) {} + +intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + NamespaceString from; + std::string as; + boost::intrusive_ptr<Expression> startWith; + std::string connectFromField; + std::string connectToField; + boost::optional<FieldPath> depthField; + boost::optional<long long> maxDepth; + + VariablesIdGenerator idGenerator; + VariablesParseState vps(&idGenerator); + + for (auto&& argument : elem.Obj()) { + const auto argName = argument.fieldNameStringData(); + + if (argName == "startWith") { + startWith = Expression::parseOperand(argument, vps); + continue; + } else if (argName == "maxDepth") { + uassert(40100, + str::stream() << "maxDepth must be numeric, found type: " + << typeName(argument.type()), + argument.isNumber()); + maxDepth = argument.safeNumberLong(); + uassert( + 40101, + str::stream() << "maxDepth requires a nonnegative argument, found: " << *maxDepth, + *maxDepth >= 0); + uassert( + 40102, + str::stream() << "maxDepth could not be represented as a long long: " << *maxDepth, + *maxDepth == argument.number()); + continue; + } + + if (argName == "from" || argName == "as" || argName == "connectFromField" || + argName == "depthField" || argName == "connectToField") { + // All remaining arguments to $graphLookup are expected to be strings. + uassert(40103, + str::stream() << "expected string as argument for " << argName + << ", found: " << argument.toString(false, false), + argument.type() == String); + } + + if (argName == "from") { + from = NamespaceString(expCtx->ns.db().toString() + '.' + argument.String()); + } else if (argName == "as") { + as = argument.String(); + } else if (argName == "connectFromField") { + connectFromField = argument.String(); + } else if (argName == "connectToField") { + connectToField = argument.String(); + } else if (argName == "depthField") { + depthField = boost::optional<FieldPath>(FieldPath(argument.String())); + } else { + uasserted(40104, + str::stream() + << "Unknown argument to $graphLookup: " << argument.fieldName()); + } + } + + const bool isMissingRequiredField = from.ns().empty() || as.empty() || !startWith || + connectFromField.empty() || connectToField.empty(); + + uassert(40105, + str::stream() << "$graphLookup requires 'from', 'as', 'startWith', 'connectFromField', " + << "and 'connectToField' to be specified.", + !isMissingRequiredField); + + intrusive_ptr<DocumentSourceGraphLookUp> newSource( + new DocumentSourceGraphLookUp(std::move(from), + std::move(as), + std::move(connectFromField), + std::move(connectToField), + std::move(startWith), + depthField, + maxDepth, + expCtx)); + + newSource->_variables.reset(new Variables(idGenerator.getIdCount())); + + return std::move(newSource); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index c1b16a3c90c..791c38dac59 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -85,6 +85,35 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { namespace DocumentSourceClass { using mongo::DocumentSource; +TEST(TruncateSort, SortTruncatesNormalField) { + BSONObj sortKey = BSON("a" << 1 << "b" << 1 << "c" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); +} + +TEST(TruncateSort, SortTruncatesOnSubfield) { + BSONObj sortKey = BSON("a" << 1 << "b.c" << 1 << "d" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); +} + +TEST(TruncateSort, SortDoesNotTruncateOnParent) { + BSONObj sortKey = BSON("a" << 1 << "b" << 1 << "d" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b.c"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1 << "b" << 1 << "d" << 1)), 1U); +} + +TEST(TruncateSort, TruncateSortDedupsSortCorrectly) { + BSONObj sortKeyOne = BSON("a" << 1 << "b" << 1); + BSONObj sortKeyTwo = BSON("a" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKeyOne, sortKeyTwo}, {"b"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); +} + template <size_t ArrayLen> set<string> arrayToSet(const char*(&array)[ArrayLen]) { set<string> out; @@ -164,7 +193,9 @@ public: } } }; -} + + +} // namespace DocumentSourceClass namespace Mock { using mongo::DocumentSourceMock; diff --git a/src/mongo/db/pipeline/lookup_set_cache.h b/src/mongo/db/pipeline/lookup_set_cache.h new file mode 100644 index 00000000000..a40ac28155b --- /dev/null +++ b/src/mongo/db/pipeline/lookup_set_cache.h @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/platform/basic.h" + +#include <unordered_map> +#include <unordered_set> +#include <iostream> +#include <boost/intrusive_ptr.hpp> +#include <boost/multi_index_container.hpp> +#include <boost/multi_index/hashed_index.hpp> +#include <boost/multi_index/member.hpp> +#include <boost/multi_index/sequenced_index.hpp> +#include <boost/optional.hpp> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/stdx/functional.h" + +namespace mongo { + +using boost::multi_index_container; +using boost::multi_index::sequenced; +using boost::multi_index::hashed_unique; +using boost::multi_index::member; +using boost::multi_index::indexed_by; + +/** + * A least-recently-used cache from key to a set of values. It does not implement any default size + * limit, but includes the ability to evict down to both a specific number of elements, and down to + * a specific amount of memory. Memory usage includes only the size of the elements in the cache at + * the time of insertion, not the overhead incurred by the data structures in use. + */ +class LookupSetCache { +public: + /** + * Insert "value" into the set with key "key". If "key" is already present in the cache, move it + * to the middle of the cache. Otherwise, insert a new key in the middle of the cache. + * + * Note: In this case, "middle" refers to the sequence of the cache, where "first" is the item + * most recently used, and "last" is the item least recently used. + * + * We insert and update in the middle because when a key is updated, we can't assume that it's + * important to keep in the cache (i.e., that we should put it at the front), but it's also + * likely we don't want to evict it (i.e., we want to make sure it isn't at the back). + */ + void insert(Value key, BSONObj value) { + // Get an iterator to the middle of the container. + size_t middle = size() / 2; + auto it = _container.begin(); + std::advance(it, middle); + + auto result = _container.insert(it, {key, {value}}); + + if (!result.second) { + // We did not insert due to a duplicate key. + auto cached = *result.first; + // Update the cached value, moving it to the middle of the cache. + cached.second.insert(value); + _container.replace(result.first, cached); + _container.relocate(it, result.first); + } else { + _memoryUsage += key.getApproximateSize(); + } + _memoryUsage += static_cast<size_t>(value.objsize()); + } + + /** + * Evict the least-recently-used item. + */ + void evictOne() { + if (_container.empty()) { + return; + } + + const Cached& pair = _container.back(); + + size_t keySize = pair.first.getApproximateSize(); + invariant(keySize <= _memoryUsage); + _memoryUsage -= keySize; + + for (auto&& elem : pair.second) { + size_t valueSize = static_cast<size_t>(elem.objsize()); + invariant(valueSize <= _memoryUsage); + _memoryUsage -= valueSize; + } + _container.erase(std::prev(_container.end())); + } + + /** + * Evicts from the cache until there are 'num' items remaining. + */ + void evictUntilSize(size_t num) { + while (size() > num) { + evictOne(); + } + } + + /** + * Returns the number of elements in the cache. + */ + size_t size() const { + return _container.size(); + } + + /** + * Evict items in LRU order until the cache's size is less than or equal to "maximum". + */ + void evictDownTo(size_t maximum) { + while (_memoryUsage > maximum && !_container.empty()) { + evictOne(); + } + } + + /** + * Clear the cache, resetting the memory usage. + */ + void clear() { + _container.clear(); + _memoryUsage = 0; + } + + /** + * Retrieve the set of values with key "key". If not found, returns boost::none. + */ + boost::optional<std::unordered_set<BSONObj, BSONObj::Hasher>> operator[](Value key) { + auto it = boost::multi_index::get<1>(_container).find(key); + if (it != boost::multi_index::get<1>(_container).end()) { + boost::multi_index::get<0>(_container) + .relocate(boost::multi_index::get<0>(_container).begin(), + boost::multi_index::project<0>(_container, it)); + return (*it).second; + } + return boost::none; + } + +private: + using Cached = std::pair<Value, std::unordered_set<BSONObj, BSONObj::Hasher>>; + + // boost::multi_index_container provides a system for implementing a cache. Here, we create + // a container of std::pair<Value, BSONObjSet>, that is both sequenced, and has a unique + // index on the Value. From this, we are able to evict the least-recently-used member, and + // maintain key uniqueness. + using IndexedContainer = multi_index_container< + Cached, + indexed_by<sequenced<>, hashed_unique<member<Cached, Value, &Cached::first>, Value::Hash>>>; + + IndexedContainer _container; + + size_t _memoryUsage = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/lookup_set_cache_test.cpp b/src/mongo/db/pipeline/lookup_set_cache_test.cpp new file mode 100644 index 00000000000..4d5ec28ad56 --- /dev/null +++ b/src/mongo/db/pipeline/lookup_set_cache_test.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/lookup_set_cache.h" +#include "mongo/unittest/unittest.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { + +BSONObj intToObj(int value) { + return BSON("n" << value); +} + +TEST(LookupSetCacheTest, InsertAndRetrieveWorksCorrectly) { + LookupSetCache cache; + cache.insert(Value(0), intToObj(1)); + cache.insert(Value(0), intToObj(2)); + cache.insert(Value(0), intToObj(3)); + cache.insert(Value(1), intToObj(4)); + cache.insert(Value(1), intToObj(5)); + + ASSERT(cache[Value(0)]); + ASSERT_TRUE(cache[Value(0)]->count(intToObj(1))); + ASSERT_TRUE(cache[Value(0)]->count(intToObj(2))); + ASSERT_TRUE(cache[Value(0)]->count(intToObj(3))); + ASSERT_FALSE(cache[Value(0)]->count(intToObj(4))); + ASSERT_FALSE(cache[Value(0)]->count(intToObj(5))); +} + +TEST(LookupSetCacheTest, CacheDoesEvictInExpectedOrder) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + cache.insert(Value(2), intToObj(0)); + cache.insert(Value(3), intToObj(0)); + + // Cache ordering is {1: ..., 3: ..., 2: ..., 0: ...}. + cache.evictOne(); + ASSERT_FALSE(cache[Value(0)]); + cache.evictOne(); + ASSERT_FALSE(cache[Value(2)]); + cache.evictOne(); + ASSERT_FALSE(cache[Value(3)]); + cache.evictOne(); + ASSERT_FALSE(cache[Value(1)]); +} + +TEST(LookupSetCacheTest, ReadDoesMoveKeyToFrontOfCache) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + // Cache ordering is now {1: [1], 0: [0]}. + + ASSERT_TRUE(cache[Value(0)]); + // Cache ordering is now {0: [0], 1: [1]}. + + cache.evictOne(); + ASSERT_TRUE(cache[Value(0)]); + ASSERT_FALSE(cache[Value(1)]); +} + +TEST(LookupSetCacheTest, InsertDoesPutKeyInMiddle) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + cache.insert(Value(2), intToObj(0)); + cache.insert(Value(3), intToObj(0)); + + cache.evictUntilSize(1); + + ASSERT_TRUE(cache[Value(1)]); +} + +TEST(LookupSetCacheTest, EvictDoesRespectMemoryUsage) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + + // One size_t for the key, one for the value. + cache.evictDownTo(Value(1).getApproximateSize() + static_cast<size_t>(intToObj(0).objsize())); + + ASSERT_TRUE(cache[Value(1)]); + ASSERT_FALSE(cache[Value(0)]); +} + +TEST(LookupSetCacheTest, ComplexAccessPatternDoesBehaveCorrectly) { + LookupSetCache cache; + + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 5; j++) { + cache.insert(Value(j), intToObj(i)); + } + } + + // Cache ordering is now {0: ..., 3: ..., 4: ..., 2: ..., 1: ...} + cache.evictOne(); + ASSERT_FALSE(cache[Value(0)]); + + ASSERT_TRUE(cache[Value(1)]); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(0))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(1))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(2))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(3))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(4))); + + cache.evictUntilSize(2); + // Cache ordering is now {1: ..., 3: ...} + + ASSERT_TRUE(cache[Value(1)]); + ASSERT_TRUE(cache[Value(3)]); + // Cache ordering is now {3: ..., 1: ...} + + cache.evictOne(); + ASSERT_FALSE(cache[Value(1)]); + + cache.insert(Value(5), intToObj(0)); + cache.evictDownTo(Value(5).getApproximateSize() + static_cast<size_t>(intToObj(0).objsize())); + + ASSERT_EQ(cache.size(), 1U); + ASSERT_TRUE(cache[Value(5)]); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index a41cf01f0d5..874c7f5f070 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -116,6 +116,18 @@ public: return collection->infoCache()->getIndexUsageStats(); } + bool hasUniqueIdIndex(const NamespaceString& ns) const final { + AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); + Collection* collection = ctx.getCollection(); + + if (!collection) { + // Collection doesn't exist; the correct return value is questionable. + return false; + } + + return collection->getIndexCatalog()->findIdIndex(_ctx->opCtx); + } + private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 18e2463be8f..4cd84115da5 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -626,6 +626,57 @@ class UnwindBeforeDoubleMatchShouldRepeatedlyOptimize : public Base { } }; +class GraphLookupShouldCoalesceWithUnwindOnAs : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: '$out'}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: " + "false}}}]"; + } +}; + +class GraphLookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: {path: '$out', preserveNullAndEmptyArrays: true}}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: true}}}]"; + } +}; + +class GraphLookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: {path: '$out', includeArrayIndex: 'index'}}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: false, " + " includeArrayIndex: 'index'}}}]"; + } +}; + +class GraphLookupShouldNotCoalesceWithUnwindNotOnAs : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: '$nottherightthing'}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: {path: '$nottherightthing'}}]"; + } +}; + } // namespace Local namespace Sharded { @@ -1043,6 +1094,10 @@ public: add<Optimizations::Local::LookupDoesSwapWithMatchOnLocalField>(); add<Optimizations::Local::LookupDoesNotAbsorbUnwindOnSubfieldOfAsButStillMovesMatch>(); add<Optimizations::Local::LookupDoesSwapWithMatchOnFieldWithSameNameAsForeignField>(); + add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAs>(); + add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty>(); + add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex>(); + add<Optimizations::Local::GraphLookupShouldNotCoalesceWithUnwindNotOnAs>(); add<Optimizations::Local::MatchShouldDuplicateItselfBeforeRedact>(); add<Optimizations::Local::MatchShouldSwapWithUnwind>(); add<Optimizations::Local::MatchShouldNotOptimizeWhenMatchingOnIndexField>(); |