From 34ae8f620cda69798230cf3d9e3986e79580fa6f Mon Sep 17 00:00:00 2001 From: Benjamin Murphy Date: Fri, 29 Jan 2016 13:27:59 -0500 Subject: SERVER-19542 Refactored pipeline optimization. --- src/mongo/db/pipeline/document_source.cpp | 4 - src/mongo/db/pipeline/document_source.h | 375 ++++++++++++--------- src/mongo/db/pipeline/document_source_cursor.cpp | 29 +- src/mongo/db/pipeline/document_source_geo_near.cpp | 24 +- src/mongo/db/pipeline/document_source_limit.cpp | 19 +- src/mongo/db/pipeline/document_source_lookup.cpp | 25 +- src/mongo/db/pipeline/document_source_match.cpp | 40 +-- src/mongo/db/pipeline/document_source_project.cpp | 16 + src/mongo/db/pipeline/document_source_redact.cpp | 26 ++ src/mongo/db/pipeline/document_source_skip.cpp | 33 +- src/mongo/db/pipeline/document_source_sort.cpp | 34 +- src/mongo/db/pipeline/document_source_test.cpp | 118 ++++++- src/mongo/db/pipeline/pipeline.cpp | 165 ++------- src/mongo/db/pipeline/pipeline.h | 10 +- src/mongo/db/pipeline/pipeline_d.cpp | 11 +- src/mongo/db/pipeline/pipeline_optimizations.h | 61 ---- src/mongo/db/pipeline/pipeline_test.cpp | 41 ++- 17 files changed, 554 insertions(+), 477 deletions(-) (limited to 'src/mongo/db/pipeline') diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 803d19c1884..da3ec5fb33c 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -83,10 +83,6 @@ void DocumentSource::setSource(DocumentSource* pTheSource) { pSource = pTheSource; } -bool DocumentSource::coalesce(const intrusive_ptr& pNextSource) { - return false; -} - intrusive_ptr DocumentSource::optimize() { return this; } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index e5c7f40718a..b40a4716cbb 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -50,6 +50,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/sorter/sorter.h" #include "mongo/stdx/functional.h" @@ -62,6 +63,7 @@ class Expression; class ExpressionFieldPath; class ExpressionObject; class DocumentSourceLimit; +class DocumentSourceSort; class PlanExecutor; class RecordCursor; @@ -125,27 +127,11 @@ public: */ virtual void setSource(DocumentSource* pSource); - /** - Attempt to coalesce this DocumentSource with its successor in the - document processing pipeline. If successful, the successor - DocumentSource should be removed from the pipeline and discarded. - - If successful, this operation can be applied repeatedly, in an - attempt to coalesce several sources together. - - The default implementation is to do nothing, and return false. - - @param pNextSource the next source in the document processing chain. - @returns whether or not the attempt to coalesce was successful or not; - if the attempt was not successful, nothing has been changed - */ - virtual bool coalesce(const boost::intrusive_ptr& pNextSource); - /** * Returns an optimized DocumentSource that is semantically equivalent to this one, or * nullptr if this stage is a no-op. Implementations are allowed to modify themselves - * in-place and return a pointer to themselves. For best results, first coalesce compatible - * sources using coalesce(). + * in-place and return a pointer to themselves. For best results, first optimize the pipeline + * with the optimizePipeline() method defined in pipeline.cpp. * * This is intended for any operations that include expressions, and provides a hook for * those to optimize those operations. @@ -154,6 +140,22 @@ public: */ virtual boost::intrusive_ptr optimize(); + /** + * Attempt to perform an optimization with the following source in the pipeline. 'container' + * refers to the entire pipeline, and 'itr' points to this stage within the pipeline. + * + * The return value is an iterator over the same container which points to the first location + * in the container at which an optimization may be possible. + * + * For example, if a swap takes place, the returned iterator should just be the position + * directly preceding 'itr', if such a position exists, since the stage at that position may be + * able to perform further optimizations with its new neighbor. + */ + virtual Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + return std::next(itr); + }; + enum GetDepsReturn { NOT_SUPPORTED = 0x0, // The full object and all metadata may be required SEE_NEXT = 0x1, // Later stages could need either fields or metadata @@ -321,8 +323,12 @@ public: ~DocumentSourceCursor() final; boost::optional getNext() final; const char* getSourceName() const final; + /** + * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; Value serialize(bool explain = false) const final; - bool coalesce(const boost::intrusive_ptr& nextSource) final; bool isValidInitialSource() const final { return true; } @@ -561,9 +567,14 @@ public: // virtuals from DocumentSource boost::optional getNext() final; const char* getSourceName() const final; - bool coalesce(const boost::intrusive_ptr& nextSource) final; Value serialize(bool explain = false) const final; boost::intrusive_ptr optimize() final; + /** + * Attempts to combine with any subsequent $match stages, joining the query objects with a + * $and. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; void setSource(DocumentSource* Source) final; /** @@ -750,6 +761,12 @@ public: // virtuals from DocumentSource boost::optional getNext() final; const char* getSourceName() const final; + /** + * Attempt to move a subsequent $skip or $limit stage before the $project, thus reducing the + * number of documents that pass through this stage. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; boost::intrusive_ptr optimize() final; Value serialize(bool explain = false) const final; @@ -789,6 +806,13 @@ public: const char* getSourceName() const final; boost::intrusive_ptr optimize() final; + /** + * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact + * stage. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& expCtx); @@ -807,139 +831,6 @@ private: boost::intrusive_ptr _expression; }; -class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource { -public: - // virtuals from DocumentSource - boost::optional getNext() final; - const char* getSourceName() const final; - void serializeToArray(std::vector& array, bool explain = false) const final; - bool coalesce(const boost::intrusive_ptr& pNextSource) final; - void dispose() final; - - GetDepsReturn getDependencies(DepsTracker* deps) const final; - - boost::intrusive_ptr getShardSource() final; - boost::intrusive_ptr getMergeSource() final; - - /** - Add sort key field. - - Adds a sort key field to the key being built up. A concatenated - key is built up by calling this repeatedly. - - @param fieldPath the field path to the key component - @param ascending if true, use the key for an ascending sort, - otherwise, use it for descending - */ - void addKey(const std::string& fieldPath, bool ascending); - - /// Write out a Document whose contents are the sort key. - Document serializeSortKey(bool explain) const; - - /** - Create a sorting DocumentSource from BSON. - - This is a convenience method that uses the above, and operates on - a BSONElement that has been deteremined to be an Object with an - element named $group. - - @param pBsonElement the BSONELement that defines the group - @param pExpCtx the expression context for the pipeline - @returns the grouping DocumentSource - */ - static boost::intrusive_ptr createFromBson( - BSONElement elem, const boost::intrusive_ptr& pExpCtx); - - /// Create a DocumentSourceSort with a given sort and (optional) limit - static boost::intrusive_ptr create( - const boost::intrusive_ptr& pExpCtx, - BSONObj sortOrder, - long long limit = -1); - - /// returns -1 for no limit - long long getLimit() const; - - /** - * Loads a document to be sorted. This can be used to sort a stream of documents that are not - * coming from another DocumentSource. Once all documents have been added, the caller must call - * loadingDone() before using getNext() to receive the documents in sorted order. - */ - void loadDocument(const Document& doc); - - /** - * Signals to the sort stage that there will be no more input documents. It is an error to call - * loadDocument() once this method returns. - */ - void loadingDone(); - - /** - * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that - * have already been sorted. - */ - void populateFromCursors(const std::vector& cursors); - - bool isPopulated() { - return populated; - }; - - boost::intrusive_ptr getLimitSrc() const { - return limitSrc; - } - -private: - explicit DocumentSourceSort(const boost::intrusive_ptr& pExpCtx); - - Value serialize(bool explain = false) const final { - verify(false); // should call addToBsonArray instead - } - - /* - Before returning anything, this source must fetch everything from - the underlying source and group it. populate() is used to do that - on the first call to any method on this source. The populated - boolean indicates that this has been done. - */ - void populate(); - bool populated; - - SortOptions makeSortOptions() const; - - // This is used to merge pre-sorted results from a DocumentSourceMergeCursors. - class IteratorFromCursor; - - /* these two parallel each other */ - typedef std::vector> SortKey; - SortKey vSortKey; - std::vector vAscending; // used like std::vector but without specialization - - /// Extracts the fields in vSortKey from the Document; - Value extractKey(const Document& d) const; - - /// Compare two Values according to the specified sort key. - int compare(const Value& lhs, const Value& rhs) const; - - typedef Sorter MySorter; - - // For MySorter - class Comparator { - public: - explicit Comparator(const DocumentSourceSort& source) : _source(source) {} - int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const { - return _source.compare(lhs.first, rhs.first); - } - - private: - const DocumentSourceSort& _source; - }; - - boost::intrusive_ptr limitSrc; - - bool _done; - bool _mergingPresorted; - std::unique_ptr _sorter; - std::unique_ptr _output; -}; - class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource { public: boost::optional getNext() final; @@ -1022,7 +913,11 @@ public: // virtuals from DocumentSource boost::optional getNext() final; const char* getSourceName() const final; - bool coalesce(const boost::intrusive_ptr& pNextSource) final; + /** + * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; Value serialize(bool explain = false) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final { @@ -1075,12 +970,166 @@ private: long long count; }; +class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource { +public: + // virtuals from DocumentSource + boost::optional getNext() final; + const char* getSourceName() const final; + void serializeToArray(std::vector& array, bool explain = false) const final; + /** + * Attempts to move a subsequent $match stage before the $sort, reducing the number of + * documents that pass through the stage. Also attempts to absorb a subsequent $limit stage so + * that it an perform a top-k sort. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + void dispose() final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final; + + boost::intrusive_ptr getShardSource() final; + boost::intrusive_ptr getMergeSource() final; + + /** + Add sort key field. + + Adds a sort key field to the key being built up. A concatenated + key is built up by calling this repeatedly. + + @param fieldPath the field path to the key component + @param ascending if true, use the key for an ascending sort, + otherwise, use it for descending + */ + void addKey(const std::string& fieldPath, bool ascending); + + /// Write out a Document whose contents are the sort key. + Document serializeSortKey(bool explain) const; + + /** + Create a sorting DocumentSource from BSON. + + This is a convenience method that uses the above, and operates on + a BSONElement that has been deteremined to be an Object with an + element named $group. + + @param pBsonElement the BSONELement that defines the group + @param pExpCtx the expression context for the pipeline + @returns the grouping DocumentSource + */ + static boost::intrusive_ptr createFromBson( + BSONElement elem, const boost::intrusive_ptr& pExpCtx); + + /// Create a DocumentSourceSort with a given sort and (optional) limit + static boost::intrusive_ptr create( + const boost::intrusive_ptr& pExpCtx, + BSONObj sortOrder, + long long limit = -1); + + /// returns -1 for no limit + long long getLimit() const; + + /** + * Loads a document to be sorted. This can be used to sort a stream of documents that are not + * coming from another DocumentSource. Once all documents have been added, the caller must call + * loadingDone() before using getNext() to receive the documents in sorted order. + */ + void loadDocument(const Document& doc); + + /** + * Signals to the sort stage that there will be no more input documents. It is an error to call + * loadDocument() once this method returns. + */ + void loadingDone(); + + /** + * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that + * have already been sorted. + */ + void populateFromCursors(const std::vector& cursors); + + bool isPopulated() { + return populated; + }; + + boost::intrusive_ptr getLimitSrc() const { + return limitSrc; + } + +private: + explicit DocumentSourceSort(const boost::intrusive_ptr& pExpCtx); + + Value serialize(bool explain = false) const final { + verify(false); // should call addToBsonArray instead + } + + /* + Before returning anything, this source must fetch everything from + the underlying source and group it. populate() is used to do that + on the first call to any method on this source. The populated + boolean indicates that this has been done. + */ + void populate(); + bool populated; + + SortOptions makeSortOptions() const; + + // This is used to merge pre-sorted results from a DocumentSourceMergeCursors. + class IteratorFromCursor; + + /* these two parallel each other */ + typedef std::vector> SortKey; + SortKey vSortKey; + std::vector vAscending; // used like std::vector but without specialization + + /// Extracts the fields in vSortKey from the Document; + Value extractKey(const Document& d) const; + + /// Compare two Values according to the specified sort key. + int compare(const Value& lhs, const Value& rhs) const; + + typedef Sorter MySorter; + + /** + * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep + * the smallest limit. + */ + void setLimitSrc(boost::intrusive_ptr limit) { + if (!limitSrc || limit->getLimit() < limitSrc->getLimit()) { + limitSrc = limit; + } + } + + // For MySorter + class Comparator { + public: + explicit Comparator(const DocumentSourceSort& source) : _source(source) {} + int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const { + return _source.compare(lhs.first, rhs.first); + } + + private: + const DocumentSourceSort& _source; + }; + + boost::intrusive_ptr limitSrc; + + bool _done; + bool _mergingPresorted; + std::unique_ptr _sorter; + std::unique_ptr _output; +}; + class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource { public: // virtuals from DocumentSource boost::optional getNext() final; const char* getSourceName() const final; - bool coalesce(const boost::intrusive_ptr& pNextSource) final; + /** + * Attempts to move a subsequent $limit before the skip, potentially allowing for forther + * optimizations earlier in the pipeline. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; Value serialize(bool explain = false) const final; boost::intrusive_ptr optimize() final; @@ -1192,10 +1241,17 @@ class DocumentSourceGeoNear : public DocumentSource, public SplittableDocumentSource, public DocumentSourceNeedsMongod { public: + static const long long kDefaultLimit; + // virtuals from DocumentSource boost::optional getNext() final; const char* getSourceName() const final; - bool coalesce(const boost::intrusive_ptr& pNextSource) final; + /** + * Attempts to combine with a subsequent limit stage, setting the internal limit field + * as a result. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; bool isValidInitialSource() const final { return true; } @@ -1253,8 +1309,13 @@ class DocumentSourceLookUp final : public DocumentSource, public: boost::optional getNext() final; const char* getSourceName() const final; - bool coalesce(const boost::intrusive_ptr& pNextSource) final; void serializeToArray(std::vector& array, bool explain = false) const final; + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc' + * field. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; GetDepsReturn getDependencies(DepsTracker* deps) const final; void dispose() final; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 65cae6448ce..e851f14cf93 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -139,21 +139,26 @@ long long DocumentSourceCursor::getLimit() const { return _limit ? _limit->getLimit() : -1; } -bool DocumentSourceCursor::coalesce(const intrusive_ptr& nextSource) { - // Note: Currently we assume the $limit is logically after any $sort or - // $match. If we ever pull in $match or $sort using this method, we - // will need to keep track of the order of the sub-stages. - - if (!_limit) { - _limit = dynamic_cast(nextSource.get()); - return _limit.get(); // false if next is not a $limit - } else { - return _limit->coalesce(nextSource); - } +Pipeline::SourceContainer::iterator DocumentSourceCursor::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextLimit = dynamic_cast((*std::next(itr)).get()); - return false; + if (nextLimit) { + if (_limit) { + // We already have an internal limit, set it to the more restrictive of the two. + _limit->setLimit(std::min(_limit->getLimit(), nextLimit->getLimit())); + } else { + _limit = nextLimit; + } + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); } + Value DocumentSourceCursor::serialize(bool explain) const { // we never parse a documentSourceCursor, so we only serialize for explain if (!explain) diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index 2279eb88261..da30c2adaa3 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -37,10 +37,11 @@ namespace mongo { using boost::intrusive_ptr; -using std::min; REGISTER_DOCUMENT_SOURCE(geoNear, DocumentSourceGeoNear::createFromBson); +const long long DocumentSourceGeoNear::kDefaultLimit = 100; + const char* DocumentSourceGeoNear::getSourceName() const { return "$geoNear"; } @@ -66,14 +67,19 @@ boost::optional DocumentSourceGeoNear::getNext() { return output.freeze(); } -bool DocumentSourceGeoNear::coalesce(const intrusive_ptr& pNextSource) { - DocumentSourceLimit* limitSrc = dynamic_cast(pNextSource.get()); - if (limitSrc) { - limit = min(limit, limitSrc->getLimit()); - return true; - } +Pipeline::SourceContainer::iterator DocumentSourceGeoNear::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); - return false; + auto nextLimit = dynamic_cast((*std::next(itr)).get()); + + if (nextLimit) { + // If the next stage is a $limit, we can combine it with ourselves. + limit = std::min(limit, nextLimit->getLimit()); + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); } // This command is sent as-is to the shards. @@ -219,7 +225,7 @@ void DocumentSourceGeoNear::parseOptions(BSONObj options) { DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr& pExpCtx) : DocumentSource(pExpCtx), coordsIsArray(false), - limit(100), + limit(DocumentSourceGeoNear::kDefaultLimit), maxDistance(-1.0), minDistance(-1.0), spherical(false), diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index b7e09785734..401afc945ff 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -49,17 +49,18 @@ const char* DocumentSourceLimit::getSourceName() const { return "$limit"; } -bool DocumentSourceLimit::coalesce(const intrusive_ptr& pNextSource) { - DocumentSourceLimit* pLimit = dynamic_cast(pNextSource.get()); +Pipeline::SourceContainer::iterator DocumentSourceLimit::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); - /* if it's not another $limit, we can't coalesce */ - if (!pLimit) - return false; + auto nextLimit = dynamic_cast((*std::next(itr)).get()); - /* we need to limit by the minimum of the two limits */ - if (pLimit->limit < limit) - limit = pLimit->limit; - return true; + if (nextLimit) { + limit = std::min(limit, nextLimit->limit); + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); } boost::optional DocumentSourceLimit::getNext() { diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 83708c8756c..f9927de1e6f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -92,18 +92,21 @@ boost::optional DocumentSourceLookUp::getNext() { return output.freeze(); } -bool DocumentSourceLookUp::coalesce(const intrusive_ptr& pNextSource) { - if (_handlingUnwind) { - return false; - } - - auto unwindSrc = dynamic_cast(pNextSource.get()); - if (!unwindSrc || unwindSrc->getUnwindPath() != _as.getPath(false)) { - return false; +Pipeline::SourceContainer::iterator DocumentSourceLookUp::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextUnwind = dynamic_cast((*std::next(itr)).get()); + + // If we are not already handling an $unwind stage internally, we can combine with the + // following $unwind stage. + if (nextUnwind && !_handlingUnwind && nextUnwind->getUnwindPath() == _as.getPath(false)) { + _unwindSrc = std::move(nextUnwind); + _handlingUnwind = true; + container->erase(std::next(itr)); + return itr; } - _unwindSrc = std::move(unwindSrc); - _handlingUnwind = true; - return true; + return std::next(itr); } void DocumentSourceLookUp::dispose() { diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index 0458a7474be..f44c3f9dca3 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -74,32 +74,22 @@ boost::optional DocumentSourceMatch::getNext() { return boost::none; } -bool DocumentSourceMatch::coalesce(const intrusive_ptr& nextSource) { - DocumentSourceMatch* otherMatch = dynamic_cast(nextSource.get()); - if (!otherMatch) - return false; - - if (otherMatch->_isTextQuery) { - // Non-initial text queries are disallowed (enforced by setSource below). This prevents - // "hiding" a non-initial text query by combining it with another match. - return false; - - // The rest of this block is for once we support non-initial text queries. - - if (_isTextQuery) { - // The score should only come from the last $match. We can't combine since then this - // match's score would impact otherMatch's. - return false; - } - - _isTextQuery = true; +Pipeline::SourceContainer::iterator DocumentSourceMatch::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextMatch = dynamic_cast((*std::next(itr)).get()); + + // Since a text search must use an index, it must be the first stage in the pipeline. We cannot + // combine a non-text stage with a text stage, as that may turn an invalid pipeline into a + // valid one, unbeknownst to the user. + if (nextMatch && !nextMatch->_isTextQuery) { + matcher.reset(new Matcher(BSON("$and" << BSON_ARRAY(getQuery() << nextMatch->getQuery())), + ExtensionsCallbackNoop())); + container->erase(std::next(itr)); + return itr; } - - // Replace our matcher with the $and of ours and theirs. - matcher.reset(new Matcher(BSON("$and" << BSON_ARRAY(getQuery() << otherMatch->getQuery())), - ExtensionsCallbackNoop())); - - return true; + return std::next(itr); } namespace { diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp index a0a81df8610..462621bc79e 100644 --- a/src/mongo/db/pipeline/document_source_project.cpp +++ b/src/mongo/db/pipeline/document_source_project.cpp @@ -83,6 +83,22 @@ intrusive_ptr DocumentSourceProject::optimize() { return this; } +Pipeline::SourceContainer::iterator DocumentSourceProject::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextSkip = dynamic_cast((*std::next(itr)).get()); + auto nextLimit = dynamic_cast((*std::next(itr)).get()); + + if (nextSkip || nextLimit) { + // Swap the $limit/$skip before ourselves, thus reducing the number of documents that + // pass through the $project. + std::swap(*itr, *std::next(itr)); + return itr == container->begin() ? itr : std::prev(itr); + } + return std::next(itr); +} + Value DocumentSourceProject::serialize(bool explain) const { return Value(DOC(getSourceName() << pEO->serialize(explain))); } diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp index 7067d27930a..b1d2c9e54f2 100644 --- a/src/mongo/db/pipeline/document_source_redact.cpp +++ b/src/mongo/db/pipeline/document_source_redact.cpp @@ -68,6 +68,32 @@ boost::optional DocumentSourceRedact::getNext() { return boost::none; } +Pipeline::SourceContainer::iterator DocumentSourceRedact::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextMatch = dynamic_cast((*std::next(itr)).get()); + + if (nextMatch) { + const BSONObj redactSafePortion = nextMatch->redactSafePortion(); + + if (!redactSafePortion.isEmpty()) { + // Because R-M turns into M-R-M without modifying the original $match, we cannot step + // backwards and optimize from before the $redact, otherwise this will just loop and + // create an infinite number of $matches. + Pipeline::SourceContainer::iterator returnItr = std::next(itr); + + container->insert( + itr, + DocumentSourceMatch::createFromBson( + BSON("$match" << redactSafePortion).firstElement(), this->pExpCtx)); + + return returnItr; + } + } + return std::next(itr); +} + Value DocumentSourceRedact::redactValue(const Value& in) { const BSONType valueType = in.getType(); if (valueType == Object) { diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index ff2e5d37161..4b3d4ec168d 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -48,18 +48,6 @@ const char* DocumentSourceSkip::getSourceName() const { return "$skip"; } -bool DocumentSourceSkip::coalesce(const intrusive_ptr& pNextSource) { - DocumentSourceSkip* pSkip = dynamic_cast(pNextSource.get()); - - /* if it's not another $skip, we can't coalesce */ - if (!pSkip) - return false; - - /* we need to skip over the sum of the two consecutive $skips */ - _skip += pSkip->_skip; - return true; -} - boost::optional DocumentSourceSkip::getNext() { pExpCtx->checkForInterrupt(); @@ -82,6 +70,27 @@ intrusive_ptr DocumentSourceSkip::optimize() { return _skip == 0 ? nullptr : this; } +Pipeline::SourceContainer::iterator DocumentSourceSkip::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextLimit = dynamic_cast((*std::next(itr)).get()); + auto nextSkip = dynamic_cast((*std::next(itr)).get()); + + if (nextLimit) { + // Swap the $limit before this stage, allowing a top-k sort to be possible, provided there + // is a $sort stage. + nextLimit->setLimit(nextLimit->getLimit() + _skip); + std::swap(*itr, *std::next(itr)); + return itr == container->begin() ? itr : std::prev(itr); + } else if (nextSkip) { + _skip += nextSkip->getSkip(); + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); +} + intrusive_ptr DocumentSourceSkip::create( const intrusive_ptr& pExpCtx) { intrusive_ptr pSource(new DocumentSourceSkip(pExpCtx)); diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index df128d4472e..af32cde08d9 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -101,15 +101,6 @@ long long DocumentSourceSort::getLimit() const { return limitSrc ? limitSrc->getLimit() : -1; } -bool DocumentSourceSort::coalesce(const intrusive_ptr& pNextSource) { - if (!limitSrc) { - limitSrc = dynamic_cast(pNextSource.get()); - return limitSrc.get(); // false if next is not a $limit - } else { - return limitSrc->coalesce(pNextSource); - } -} - void DocumentSourceSort::addKey(const string& fieldPath, bool ascending) { VariablesIdGenerator idGenerator; VariablesParseState vps(&idGenerator); @@ -139,6 +130,27 @@ Document DocumentSourceSort::serializeSortKey(bool explain) const { return keyObj.freeze(); } +Pipeline::SourceContainer::iterator DocumentSourceSort::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + auto nextMatch = dynamic_cast((*std::next(itr)).get()); + auto nextLimit = dynamic_cast((*std::next(itr)).get()); + + if (nextLimit) { + // If the following stage is a $limit, we can combine it with ourselves. + setLimitSrc(nextLimit); + container->erase(std::next(itr)); + return itr; + } else if (nextMatch && !nextMatch->isTextQuery()) { + // Swap the $match before the $sort, thus reducing the number of documents that pass into + // this stage. + std::swap(*itr, *std::next(itr)); + return itr == container->begin() ? itr : std::prev(itr); + } + return std::next(itr); +} + DocumentSource::GetDepsReturn DocumentSourceSort::getDependencies(DepsTracker* deps) const { for (size_t i = 0; i < vSortKey.size(); ++i) { vSortKey[i]->addDependencies(deps); @@ -201,9 +213,7 @@ intrusive_ptr DocumentSourceSort::create( uassert(15976, "$sort stage must have at least one sort key", !pSort->vSortKey.empty()); if (limit > 0) { - bool coalesced = pSort->coalesce(DocumentSourceLimit::create(pExpCtx, limit)); - verify(coalesced); // should always coalesce - verify(pSort->getLimit() == limit); + pSort->setLimitSrc(DocumentSourceLimit::create(pExpCtx, limit)); } return pSort; diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index bdf9ac79a39..c34cbe6d457 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_noop.h" #include "mongo/stdx/memory.h" @@ -215,6 +216,46 @@ TEST(Mock, Empty) { } // namespace Mock +namespace DocumentSourceRedact { +using mongo::DocumentSourceRedact; +using mongo::DocumentSourceMatch; +using mongo::DocumentSourceMock; + +class Base : public Mock::Base { +protected: + void createRedact() { + BSONObj spec = BSON("$redact" + << "$$PRUNE"); + _redact = DocumentSourceRedact::createFromBson(spec.firstElement(), ctx()); + } + + DocumentSource* redact() { + return _redact.get(); + } + +private: + intrusive_ptr _redact; +}; + +class PromoteMatch : public Base { +public: + void run() { + createRedact(); + + auto match = DocumentSourceMatch::createFromBson(BSON("a" << 1).firstElement(), ctx()); + + Pipeline::SourceContainer pipeline; + pipeline.push_back(redact()); + pipeline.push_back(match); + + pipeline.front()->optimizeAt(pipeline.begin(), &pipeline); + + ASSERT_EQUALS(pipeline.size(), 4); + ASSERT(dynamic_cast(pipeline.front().get())); + } +}; +} // namespace DocumentSourceRedact + namespace DocumentSourceLimit { using mongo::DocumentSourceLimit; @@ -251,6 +292,25 @@ public: } }; +/** Combine two $limit stages. */ +class CombineLimit : public Base { +public: + void run() { + Pipeline::SourceContainer container; + createLimit(10); + + auto secondLimit = + DocumentSourceLimit::createFromBson(BSON("$limit" << 5).firstElement(), ctx()); + + container.push_back(limit()); + container.push_back(secondLimit); + + limit()->optimizeAt(container.begin(), &container); + ASSERT_EQUALS(5, static_cast(limit())->getLimit()); + ASSERT_EQUALS(1, container.size()); + } +}; + /** Exhausting a DocumentSourceLimit disposes of the pipeline's source. */ class DisposeSourceCascade : public Base { public: @@ -1402,6 +1462,9 @@ public: createSort(BSON("a" << 1)); ASSERT_EQUALS(sort()->getLimit(), -1); + Pipeline::SourceContainer container; + container.push_back(sort()); + { // pre-limit checks vector arr; sort()->serializeToArray(arr); @@ -1411,12 +1474,22 @@ public: ASSERT(sort()->getMergeSource() != NULL); } - ASSERT_TRUE(sort()->coalesce(mkLimit(10))); + container.push_back(mkLimit(10)); + sort()->optimizeAt(container.begin(), &container); + ASSERT_EQUALS(container.size(), 1); ASSERT_EQUALS(sort()->getLimit(), 10); - ASSERT_TRUE(sort()->coalesce(mkLimit(15))); - ASSERT_EQUALS(sort()->getLimit(), 10); // unchanged - ASSERT_TRUE(sort()->coalesce(mkLimit(5))); - ASSERT_EQUALS(sort()->getLimit(), 5); // reduced + + // unchanged + container.push_back(mkLimit(15)); + sort()->optimizeAt(container.begin(), &container); + ASSERT_EQUALS(container.size(), 1); + ASSERT_EQUALS(sort()->getLimit(), 10); + + // reduced + container.push_back(mkLimit(5)); + sort()->optimizeAt(container.begin(), &container); + ASSERT_EQUALS(container.size(), 1); + ASSERT_EQUALS(sort()->getLimit(), 5); vector arr; sort()->serializeToArray(arr); @@ -2477,15 +2550,27 @@ public: void run() { intrusive_ptr geoNear = DocumentSourceGeoNear::create(ctx()); - ASSERT_EQUALS(geoNear->getLimit(), 100); + Pipeline::SourceContainer container; + container.push_back(geoNear); + + ASSERT_EQUALS(geoNear->getLimit(), DocumentSourceGeoNear::kDefaultLimit); + + container.push_back(DocumentSourceLimit::create(ctx(), 200)); + geoNear->optimizeAt(container.begin(), &container); - ASSERT(geoNear->coalesce(DocumentSourceLimit::create(ctx(), 200))); - ASSERT_EQUALS(geoNear->getLimit(), 100); + ASSERT_EQUALS(container.size(), 1); + ASSERT_EQUALS(geoNear->getLimit(), DocumentSourceGeoNear::kDefaultLimit); - ASSERT(geoNear->coalesce(DocumentSourceLimit::create(ctx(), 50))); + container.push_back(DocumentSourceLimit::create(ctx(), 50)); + geoNear->optimizeAt(container.begin(), &container); + + ASSERT_EQUALS(container.size(), 1); ASSERT_EQUALS(geoNear->getLimit(), 50); - ASSERT(geoNear->coalesce(DocumentSourceLimit::create(ctx(), 30))); + container.push_back(DocumentSourceLimit::create(ctx(), 30)); + geoNear->optimizeAt(container.begin(), &container); + + ASSERT_EQUALS(container.size(), 1); ASSERT_EQUALS(geoNear->getLimit(), 30); } }; @@ -2644,15 +2729,23 @@ public: intrusive_ptr match2 = makeMatch(BSON("b" << 1)); intrusive_ptr match3 = makeMatch(BSON("c" << 1)); + Pipeline::SourceContainer container; + // Check initial state ASSERT_EQUALS(match1->getQuery(), BSON("a" << 1)); ASSERT_EQUALS(match2->getQuery(), BSON("b" << 1)); ASSERT_EQUALS(match3->getQuery(), BSON("c" << 1)); - ASSERT(match1->coalesce(match2)); + container.push_back(match1); + container.push_back(match2); + match1->optimizeAt(container.begin(), &container); + + ASSERT_EQUALS(container.size(), 1); ASSERT_EQUALS(match1->getQuery(), fromjson("{'$and': [{a:1}, {b:1}]}")); - ASSERT(match1->coalesce(match3)); + container.push_back(match3); + match1->optimizeAt(container.begin(), &container); + ASSERT_EQUALS(container.size(), 1); ASSERT_EQUALS(match1->getQuery(), fromjson( "{'$and': [{'$and': [{a:1}, {b:1}]}," @@ -2668,6 +2761,7 @@ public: add(); add(); + add(); add(); add(); diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index e237f35a61e..e4a9e7d1e1d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -173,151 +173,28 @@ intrusive_ptr Pipeline::parseCommand(string& errmsg, } } - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - Optimizations::Local::moveMatchBeforeSort(pPipeline.get()); - Optimizations::Local::moveSkipAndLimitBeforeProject(pPipeline.get()); - Optimizations::Local::moveLimitBeforeSkip(pPipeline.get()); - Optimizations::Local::coalesceAdjacent(pPipeline.get()); - Optimizations::Local::optimizeEachDocumentSource(pPipeline.get()); - Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get()); + pPipeline->optimizePipeline(); return pPipeline; } -void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) { - // TODO Keep moving matches across multiple sorts as moveLimitBeforeSkip does below. - // TODO Check sort for limit. Not an issue currently due to order optimizations are applied, - // but should be fixed. - SourceContainer& sources = pipeline->sources; - for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) { - intrusive_ptr& pSource = sources[srci]; - DocumentSourceMatch* match = dynamic_cast(pSource.get()); - if (match && !match->isTextQuery()) { - intrusive_ptr& pPrevious = sources[srci - 1]; - if (dynamic_cast(pPrevious.get())) { - /* swap this item with the previous */ - intrusive_ptr pTemp(pPrevious); - pPrevious = pSource; - pSource = pTemp; - } - } - } -} - -void Pipeline::Optimizations::Local::moveSkipAndLimitBeforeProject(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.empty()) - return; +void Pipeline::optimizePipeline() { + SourceContainer optimizedSources; - for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { - // This optimization only applies when a $project comes before a $skip or $limit. - auto project = dynamic_cast(sources[i - 1].get()); - if (!project) - continue; + SourceContainer::iterator itr = sources.begin(); - auto skip = dynamic_cast(sources[i].get()); - auto limit = dynamic_cast(sources[i].get()); - if (!(skip || limit)) - continue; - - swap(sources[i], sources[i - 1]); - - // Start at back again. This is needed to handle cases with more than 1 $skip or - // $limit (S means skip, L means limit, P means project) - // - // These would work without second pass (assuming back to front ordering) - // PS -> SP - // PL -> LP - // PPL -> LPP - // PPS -> SPP - // - // The following cases need a second pass to handle the second skip or limit - // PLL -> LLP - // PPLL -> LLPP - // PLPL -> LLPP - i = sources.size(); // decremented before next pass - } -} - -void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.empty()) - return; - - for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { - DocumentSourceLimit* limit = dynamic_cast(sources[i].get()); - DocumentSourceSkip* skip = dynamic_cast(sources[i - 1].get()); - if (limit && skip) { - // Increase limit by skip since the skipped docs now pass through the $limit - limit->setLimit(limit->getLimit() + skip->getSkip()); - swap(sources[i], sources[i - 1]); - - // Start at back again. This is needed to handle cases with more than 1 $limit - // (S means skip, L means limit) - // - // These two would work without second pass (assuming back to front ordering) - // SL -> LS - // SSL -> LSS - // - // The following cases need a second pass to handle the second limit - // SLL -> LLS - // SSLL -> LLSS - // SLSL -> LLSS - i = sources.size(); // decremented before next pass - } + while (itr != sources.end() && std::next(itr) != sources.end()) { + invariant((*itr).get()); + itr = (*itr).get()->optimizeAt(itr, &sources); } -} -void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.empty()) - return; - - // move all sources to a temporary list - SourceContainer tempSources; - sources.swap(tempSources); - - // move the first one to the final list - sources.push_back(tempSources[0]); - - // run through the sources, coalescing them or keeping them - for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) { - // If we can't coalesce the source with the last, then move it - // to the final list, and make it the new last. (If we succeeded, - // then we're still on the same last, and there's no need to move - // or do anything with the source -- the destruction of tempSources - // will take care of the rest.) - intrusive_ptr& pLastSource = sources.back(); - intrusive_ptr& pTemp = tempSources[tempi]; - verify(pTemp && pLastSource); - if (!pLastSource->coalesce(pTemp)) - sources.push_back(pTemp); - } -} - -void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - SourceContainer newSources; - for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) { - if (auto out = (*it)->optimize()) { - newSources.push_back(std::move(out)); - } - } - pipeline->sources = std::move(newSources); -} - -void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) { - SourceContainer& sources = pipeline->sources; - if (sources.size() >= 2 && dynamic_cast(sources[0].get())) { - if (DocumentSourceMatch* match = dynamic_cast(sources[1].get())) { - const BSONObj redactSafePortion = match->redactSafePortion(); - if (!redactSafePortion.isEmpty()) { - sources.push_front(DocumentSourceMatch::createFromBson( - BSON("$match" << redactSafePortion).firstElement(), pipeline->pCtx)); - } + // Once we have reached our final number of stages, optimize each individually. + for (auto&& source : sources) { + if (auto out = source->optimize()) { + optimizedSources.push_back(out); } } + sources.swap(optimizedSources); } Status Pipeline::checkAuthForCommand(ClientBasic* client, @@ -447,12 +324,11 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin // objects even though only a subset of fields are needed. // 2) Optimization IS NOT applied immediately following a $project or $group since it would // add an unnecessary project (and therefore a deep-copy). - for (size_t i = 0; i < shardPipe->sources.size(); i++) { - DepsTracker dt; // ignored - if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) + for (auto&& source : shardPipe->sources) { + DepsTracker dt; + if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) return; } - // if we get here, add the project. shardPipe->sources.push_back(DocumentSourceProject::createFromBson( BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx)); @@ -522,8 +398,7 @@ void Pipeline::stitch() { /* chain together the sources we found */ DocumentSource* prevSource = sources.front().get(); - for (SourceContainer::iterator iter(sources.begin() + 1), listEnd(sources.end()); - iter != listEnd; + for (SourceContainer::iterator iter(++sources.begin()), listEnd(sources.end()); iter != listEnd; ++iter) { intrusive_ptr pTemp(*iter); pTemp->setSource(prevSource); @@ -571,9 +446,9 @@ DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const { DepsTracker deps; bool knowAllFields = false; bool knowAllMeta = false; - for (size_t i = 0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) { + for (auto&& source : sources) { DepsTracker localDeps; - DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps); + DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps); if (status == DocumentSource::NOT_SUPPORTED) { // Assume this stage needs everything. We may still know something about our @@ -595,6 +470,10 @@ DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const { knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; } + + if (knowAllMeta && knowAllFields) { + break; + } } if (!knowAllFields) diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 65ff1ad97e9..431933e159e 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -52,6 +52,8 @@ class Privilege; */ class Pipeline : public IntrusiveCounterUnsigned { public: + typedef std::list> SourceContainer; + /** * Create a pipeline from the command. * @@ -64,7 +66,7 @@ public: const BSONObj& cmdObj, const boost::intrusive_ptr& pCtx); - /// Helper to implement Command::checkAuthForCommand + // Helper to implement Command::checkAuthForCommand static Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj); @@ -94,6 +96,11 @@ public: */ bool needsPrimaryShardMerger() const; + /** + * Modifies the pipeline, optimizing it by combining and swapping stages. + */ + void optimizePipeline(); + /** * Returns any other collections involved in the pipeline in addition to the collection the * aggregation is run on. @@ -189,7 +196,6 @@ private: Pipeline(const boost::intrusive_ptr& pCtx); - typedef std::deque> SourceContainer; SourceContainer sources; bool explain; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 4cebf691c26..91be4e3a92e 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -221,9 +221,9 @@ shared_ptr PipelineD::prepareCursorSource( Pipeline::SourceContainer& sources = pPipeline->sources; // Inject a MongodImplementation to sources that need them. - for (size_t i = 0; i < sources.size(); i++) { + for (auto&& source : sources) { DocumentSourceNeedsMongod* needsMongod = - dynamic_cast(sources[i].get()); + dynamic_cast(source.get()); if (needsMongod) { needsMongod->injectMongodInterface(std::make_shared(pExpCtx)); } @@ -434,11 +434,10 @@ shared_ptr PipelineD::addCursorSource(const intrusive_ptrsetProjection(deps.toProjection(), deps.toParsedDeps()); } - while (!pipeline->sources.empty() && pSource->coalesce(pipeline->sources.front())) { - pipeline->sources.pop_front(); - } - + // Add the initial DocumentSourceCursor to the front of the pipeline. Then optimize again in + // case the new stage can be absorbed with the first stages of the pipeline. pipeline->addInitialSource(pSource); + pipeline->optimizePipeline(); // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We // deregister the PlanExecutor so that it can be registered with ClientCursor. diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h index 68763c2ac5c..af85fdff4b0 100644 --- a/src/mongo/db/pipeline/pipeline_optimizations.h +++ b/src/mongo/db/pipeline/pipeline_optimizations.h @@ -36,67 +36,6 @@ #include "mongo/db/pipeline/pipeline.h" namespace mongo { -/** - * This class holds optimizations applied to a single Pipeline. - * - * Each function has the same signature and takes a Pipeline as an in/out parameter. - */ -class Pipeline::Optimizations::Local { -public: - /** - * Moves matches before any adjacent sort phases. - * - * This means we sort fewer items. Neither sorts, nor matches (excluding $text) - * change the documents in the stream, so this transformation shouldn't affect - * the result. - */ - static void moveMatchBeforeSort(Pipeline* pipeline); - - /** - * Moves skip and limit before any adjacent project phases. - * - * While this is performance-neutral on its own, it enables other optimizations - * such as combining sort and limit. - */ - static void moveSkipAndLimitBeforeProject(Pipeline* pipeline); - - /** - * Moves limits before any adjacent skip phases. - * - * This is more optimal for sharding since currently, we can only split - * the pipeline at a single source and it is better to limit the results - * coming from each shard. This also enables other optimizations like - * coalescing the limit into a sort. - */ - static void moveLimitBeforeSkip(Pipeline* pipeline); - - /** - * Runs through the DocumentSources, and give each one the opportunity - * to coalesce with its successor. If successful, remove the successor. - * - * This should generally be run after optimizations that reorder stages - * to be most effective. - * - * NOTE: uses the DocumentSource::coalesce() method - */ - static void coalesceAdjacent(Pipeline* pipeline); - - /** - * Gives each DocumentSource the opportunity to optimize itself. - * - * NOTE: uses the DocumentSource::optimize() method - */ - static void optimizeEachDocumentSource(Pipeline* pipeline); - - /** - * Optimizes [$redact, $match] to [$match, $redact, $match] if possible. - * - * This gives us the ability to use indexes and reduce the number of - * BSONObjs converted to Documents. - */ - static void duplicateMatchBeforeInitalRedact(Pipeline* pipeline); -}; - /** * This class holds optimizations applied to a shard Pipeline and a merger Pipeline. * diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 333fc5963de..e936b895cca 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -99,7 +99,7 @@ class MoveLimitBeforeProject : public Base { } }; -class MoveMulitipleSkipsAndLimitsBeforeProject : public Base { +class MoveMultipleSkipsAndLimitsBeforeProject : public Base { string inputPipeJson() override { return "[{$project: {a : 1}}, {$limit : 5}, {$skip : 3}]"; } @@ -109,6 +109,21 @@ class MoveMulitipleSkipsAndLimitsBeforeProject : public Base { } }; +class SkipSkipLimitBecomesLimitSkip : public Base { + string inputPipeJson() override { + return "[{$skip : 3}" + ",{$skip : 5}" + ",{$limit: 5}" + "]"; + } + + string outputPipeJson() override { + return "[{$limit: 13}" + ",{$skip : 8}" + "]"; + } +}; + class SortMatchProjSkipLimBecomesMatchTopKSortSkipProj : public Base { string inputPipeJson() override { return "[{$sort: {a: 1}}" @@ -179,6 +194,16 @@ class DoNotRemoveNonEmptyMatch : public Base { } }; +class MoveMatchBeforeSort : public Base { + string inputPipeJson() override { + return "[{$sort: {b: 1}}, {$match: {a: 2}}]"; + } + + string outputPipeJson() override { + return "[{$match: {a: 2}}, {$sort: {sortKey: {b: 1}}}]"; + } +}; + class LookupShouldCoalesceWithUnwindOnAs : public Base { string inputPipeJson() { return "[{$lookup: {from : 'coll2', as : 'same', localField: 'left', foreignField: " @@ -234,6 +259,15 @@ class LookupShouldNotCoalesceWithUnwindNotOnAs : public Base { } }; +class MatchShouldDuplicateItselfBeforeRedact : public Base { + string inputPipeJson() { + return "[{$redact: '$$PRUNE'}, {$match: {a: 1, b:12}}]"; + } + string outputPipeJson() { + return "[{$match: {a: 1, b:12}}, {$redact: '$$PRUNE'}, {$match: {a: 1, b:12}}]"; + } +}; + } // namespace Local namespace Sharded { @@ -600,16 +634,19 @@ public: add(); add(); add(); - add(); + add(); + add(); add(); add(); add(); add(); + add(); add(); add(); add(); add(); add(); + add(); add(); add(); add