summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source.h375
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp29
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp24
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp40
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_redact.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source_test.cpp118
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp165
-rw-r--r--src/mongo/db/pipeline/pipeline.h10
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp11
-rw-r--r--src/mongo/db/pipeline/pipeline_optimizations.h61
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp41
17 files changed, 554 insertions, 477 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 803d19c1884..da3ec5fb33c 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -83,10 +83,6 @@ void DocumentSource::setSource(DocumentSource* pTheSource) {
pSource = pTheSource;
}
-bool DocumentSource::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
- return false;
-}
-
intrusive_ptr<DocumentSource> DocumentSource::optimize() {
return this;
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index e5c7f40718a..b40a4716cbb 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -50,6 +50,7 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/sorter/sorter.h"
#include "mongo/stdx/functional.h"
@@ -62,6 +63,7 @@ class Expression;
class ExpressionFieldPath;
class ExpressionObject;
class DocumentSourceLimit;
+class DocumentSourceSort;
class PlanExecutor;
class RecordCursor;
@@ -126,26 +128,10 @@ public:
virtual void setSource(DocumentSource* pSource);
/**
- Attempt to coalesce this DocumentSource with its successor in the
- document processing pipeline. If successful, the successor
- DocumentSource should be removed from the pipeline and discarded.
-
- If successful, this operation can be applied repeatedly, in an
- attempt to coalesce several sources together.
-
- The default implementation is to do nothing, and return false.
-
- @param pNextSource the next source in the document processing chain.
- @returns whether or not the attempt to coalesce was successful or not;
- if the attempt was not successful, nothing has been changed
- */
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource);
-
- /**
* Returns an optimized DocumentSource that is semantically equivalent to this one, or
* nullptr if this stage is a no-op. Implementations are allowed to modify themselves
- * in-place and return a pointer to themselves. For best results, first coalesce compatible
- * sources using coalesce().
+ * in-place and return a pointer to themselves. For best results, first optimize the pipeline
+ * with the optimizePipeline() method defined in pipeline.cpp.
*
* This is intended for any operations that include expressions, and provides a hook for
* those to optimize those operations.
@@ -154,6 +140,22 @@ public:
*/
virtual boost::intrusive_ptr<DocumentSource> optimize();
+ /**
+ * Attempt to perform an optimization with the following source in the pipeline. 'container'
+ * refers to the entire pipeline, and 'itr' points to this stage within the pipeline.
+ *
+ * The return value is an iterator over the same container which points to the first location
+ * in the container at which an optimization may be possible.
+ *
+ * For example, if a swap takes place, the returned iterator should just be the position
+ * directly preceding 'itr', if such a position exists, since the stage at that position may be
+ * able to perform further optimizations with its new neighbor.
+ */
+ virtual Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ return std::next(itr);
+ };
+
enum GetDepsReturn {
NOT_SUPPORTED = 0x0, // The full object and all metadata may be required
SEE_NEXT = 0x1, // Later stages could need either fields or metadata
@@ -321,8 +323,12 @@ public:
~DocumentSourceCursor() final;
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
+ /**
+ * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
Value serialize(bool explain = false) const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& nextSource) final;
bool isValidInitialSource() const final {
return true;
}
@@ -561,9 +567,14 @@ public:
// virtuals from DocumentSource
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& nextSource) final;
Value serialize(bool explain = false) const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
+ /**
+ * Attempts to combine with any subsequent $match stages, joining the query objects with a
+ * $and.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
void setSource(DocumentSource* Source) final;
/**
@@ -750,6 +761,12 @@ public:
// virtuals from DocumentSource
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
+ /**
+ * Attempt to move a subsequent $skip or $limit stage before the $project, thus reducing the
+ * number of documents that pass through this stage.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
boost::intrusive_ptr<DocumentSource> optimize() final;
Value serialize(bool explain = false) const final;
@@ -789,6 +806,13 @@ public:
const char* getSourceName() const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
+ /**
+ * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
+ * stage.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -807,139 +831,6 @@ private:
boost::intrusive_ptr<Expression> _expression;
};
-class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource {
-public:
- // virtuals from DocumentSource
- boost::optional<Document> getNext() final;
- const char* getSourceName() const final;
- void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource) final;
- void dispose() final;
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
-
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- boost::intrusive_ptr<DocumentSource> getMergeSource() final;
-
- /**
- Add sort key field.
-
- Adds a sort key field to the key being built up. A concatenated
- key is built up by calling this repeatedly.
-
- @param fieldPath the field path to the key component
- @param ascending if true, use the key for an ascending sort,
- otherwise, use it for descending
- */
- void addKey(const std::string& fieldPath, bool ascending);
-
- /// Write out a Document whose contents are the sort key.
- Document serializeSortKey(bool explain) const;
-
- /**
- Create a sorting DocumentSource from BSON.
-
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $group.
-
- @param pBsonElement the BSONELement that defines the group
- @param pExpCtx the expression context for the pipeline
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /// Create a DocumentSourceSort with a given sort and (optional) limit
- static boost::intrusive_ptr<DocumentSourceSort> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- BSONObj sortOrder,
- long long limit = -1);
-
- /// returns -1 for no limit
- long long getLimit() const;
-
- /**
- * Loads a document to be sorted. This can be used to sort a stream of documents that are not
- * coming from another DocumentSource. Once all documents have been added, the caller must call
- * loadingDone() before using getNext() to receive the documents in sorted order.
- */
- void loadDocument(const Document& doc);
-
- /**
- * Signals to the sort stage that there will be no more input documents. It is an error to call
- * loadDocument() once this method returns.
- */
- void loadingDone();
-
- /**
- * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that
- * have already been sorted.
- */
- void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
-
- bool isPopulated() {
- return populated;
- };
-
- boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const {
- return limitSrc;
- }
-
-private:
- explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- Value serialize(bool explain = false) const final {
- verify(false); // should call addToBsonArray instead
- }
-
- /*
- Before returning anything, this source must fetch everything from
- the underlying source and group it. populate() is used to do that
- on the first call to any method on this source. The populated
- boolean indicates that this has been done.
- */
- void populate();
- bool populated;
-
- SortOptions makeSortOptions() const;
-
- // This is used to merge pre-sorted results from a DocumentSourceMergeCursors.
- class IteratorFromCursor;
-
- /* these two parallel each other */
- typedef std::vector<boost::intrusive_ptr<Expression>> SortKey;
- SortKey vSortKey;
- std::vector<char> vAscending; // used like std::vector<bool> but without specialization
-
- /// Extracts the fields in vSortKey from the Document;
- Value extractKey(const Document& d) const;
-
- /// Compare two Values according to the specified sort key.
- int compare(const Value& lhs, const Value& rhs) const;
-
- typedef Sorter<Value, Document> MySorter;
-
- // For MySorter
- class Comparator {
- public:
- explicit Comparator(const DocumentSourceSort& source) : _source(source) {}
- int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
- return _source.compare(lhs.first, rhs.first);
- }
-
- private:
- const DocumentSourceSort& _source;
- };
-
- boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
-
- bool _done;
- bool _mergingPresorted;
- std::unique_ptr<MySorter> _sorter;
- std::unique_ptr<MySorter::Iterator> _output;
-};
-
class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource {
public:
boost::optional<Document> getNext() final;
@@ -1022,7 +913,11 @@ public:
// virtuals from DocumentSource
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource) final;
+ /**
+ * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
Value serialize(bool explain = false) const final;
GetDepsReturn getDependencies(DepsTracker* deps) const final {
@@ -1075,12 +970,166 @@ private:
long long count;
};
+class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ boost::optional<Document> getNext() final;
+ const char* getSourceName() const final;
+ void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
+ /**
+ * Attempts to move a subsequent $match stage before the $sort, reducing the number of
+ * documents that pass through the stage. Also attempts to absorb a subsequent $limit stage so
+ * that it an perform a top-k sort.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ void dispose() final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final;
+
+ /**
+ Add sort key field.
+
+ Adds a sort key field to the key being built up. A concatenated
+ key is built up by calling this repeatedly.
+
+ @param fieldPath the field path to the key component
+ @param ascending if true, use the key for an ascending sort,
+ otherwise, use it for descending
+ */
+ void addKey(const std::string& fieldPath, bool ascending);
+
+ /// Write out a Document whose contents are the sort key.
+ Document serializeSortKey(bool explain) const;
+
+ /**
+ Create a sorting DocumentSource from BSON.
+
+ This is a convenience method that uses the above, and operates on
+ a BSONElement that has been deteremined to be an Object with an
+ element named $group.
+
+ @param pBsonElement the BSONELement that defines the group
+ @param pExpCtx the expression context for the pipeline
+ @returns the grouping DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /// Create a DocumentSourceSort with a given sort and (optional) limit
+ static boost::intrusive_ptr<DocumentSourceSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ BSONObj sortOrder,
+ long long limit = -1);
+
+ /// returns -1 for no limit
+ long long getLimit() const;
+
+ /**
+ * Loads a document to be sorted. This can be used to sort a stream of documents that are not
+ * coming from another DocumentSource. Once all documents have been added, the caller must call
+ * loadingDone() before using getNext() to receive the documents in sorted order.
+ */
+ void loadDocument(const Document& doc);
+
+ /**
+ * Signals to the sort stage that there will be no more input documents. It is an error to call
+ * loadDocument() once this method returns.
+ */
+ void loadingDone();
+
+ /**
+ * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that
+ * have already been sorted.
+ */
+ void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
+
+ bool isPopulated() {
+ return populated;
+ };
+
+ boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const {
+ return limitSrc;
+ }
+
+private:
+ explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ Value serialize(bool explain = false) const final {
+ verify(false); // should call addToBsonArray instead
+ }
+
+ /*
+ Before returning anything, this source must fetch everything from
+ the underlying source and group it. populate() is used to do that
+ on the first call to any method on this source. The populated
+ boolean indicates that this has been done.
+ */
+ void populate();
+ bool populated;
+
+ SortOptions makeSortOptions() const;
+
+ // This is used to merge pre-sorted results from a DocumentSourceMergeCursors.
+ class IteratorFromCursor;
+
+ /* these two parallel each other */
+ typedef std::vector<boost::intrusive_ptr<Expression>> SortKey;
+ SortKey vSortKey;
+ std::vector<char> vAscending; // used like std::vector<bool> but without specialization
+
+ /// Extracts the fields in vSortKey from the Document;
+ Value extractKey(const Document& d) const;
+
+ /// Compare two Values according to the specified sort key.
+ int compare(const Value& lhs, const Value& rhs) const;
+
+ typedef Sorter<Value, Document> MySorter;
+
+ /**
+ * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep
+ * the smallest limit.
+ */
+ void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) {
+ if (!limitSrc || limit->getLimit() < limitSrc->getLimit()) {
+ limitSrc = limit;
+ }
+ }
+
+ // For MySorter
+ class Comparator {
+ public:
+ explicit Comparator(const DocumentSourceSort& source) : _source(source) {}
+ int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
+ return _source.compare(lhs.first, rhs.first);
+ }
+
+ private:
+ const DocumentSourceSort& _source;
+ };
+
+ boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
+
+ bool _done;
+ bool _mergingPresorted;
+ std::unique_ptr<MySorter> _sorter;
+ std::unique_ptr<MySorter::Iterator> _output;
+};
+
class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource {
public:
// virtuals from DocumentSource
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource) final;
+ /**
+ * Attempts to move a subsequent $limit before the skip, potentially allowing for forther
+ * optimizations earlier in the pipeline.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
Value serialize(bool explain = false) const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
@@ -1192,10 +1241,17 @@ class DocumentSourceGeoNear : public DocumentSource,
public SplittableDocumentSource,
public DocumentSourceNeedsMongod {
public:
+ static const long long kDefaultLimit;
+
// virtuals from DocumentSource
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource) final;
+ /**
+ * Attempts to combine with a subsequent limit stage, setting the internal limit field
+ * as a result.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
bool isValidInitialSource() const final {
return true;
}
@@ -1253,8 +1309,13 @@ class DocumentSourceLookUp final : public DocumentSource,
public:
boost::optional<Document> getNext() final;
const char* getSourceName() const final;
- bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource) final;
void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
+ /**
+ * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc'
+ * field.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
GetDepsReturn getDependencies(DepsTracker* deps) const final;
void dispose() final;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 65cae6448ce..e851f14cf93 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -139,21 +139,26 @@ long long DocumentSourceCursor::getLimit() const {
return _limit ? _limit->getLimit() : -1;
}
-bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) {
- // Note: Currently we assume the $limit is logically after any $sort or
- // $match. If we ever pull in $match or $sort using this method, we
- // will need to keep track of the order of the sub-stages.
-
- if (!_limit) {
- _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get());
- return _limit.get(); // false if next is not a $limit
- } else {
- return _limit->coalesce(nextSource);
- }
+Pipeline::SourceContainer::iterator DocumentSourceCursor::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
- return false;
+ if (nextLimit) {
+ if (_limit) {
+ // We already have an internal limit, set it to the more restrictive of the two.
+ _limit->setLimit(std::min(_limit->getLimit(), nextLimit->getLimit()));
+ } else {
+ _limit = nextLimit;
+ }
+ container->erase(std::next(itr));
+ return itr;
+ }
+ return std::next(itr);
}
+
Value DocumentSourceCursor::serialize(bool explain) const {
// we never parse a documentSourceCursor, so we only serialize for explain
if (!explain)
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index 2279eb88261..da30c2adaa3 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -37,10 +37,11 @@
namespace mongo {
using boost::intrusive_ptr;
-using std::min;
REGISTER_DOCUMENT_SOURCE(geoNear, DocumentSourceGeoNear::createFromBson);
+const long long DocumentSourceGeoNear::kDefaultLimit = 100;
+
const char* DocumentSourceGeoNear::getSourceName() const {
return "$geoNear";
}
@@ -66,14 +67,19 @@ boost::optional<Document> DocumentSourceGeoNear::getNext() {
return output.freeze();
}
-bool DocumentSourceGeoNear::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
- DocumentSourceLimit* limitSrc = dynamic_cast<DocumentSourceLimit*>(pNextSource.get());
- if (limitSrc) {
- limit = min(limit, limitSrc->getLimit());
- return true;
- }
+Pipeline::SourceContainer::iterator DocumentSourceGeoNear::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
- return false;
+ auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
+
+ if (nextLimit) {
+ // If the next stage is a $limit, we can combine it with ourselves.
+ limit = std::min(limit, nextLimit->getLimit());
+ container->erase(std::next(itr));
+ return itr;
+ }
+ return std::next(itr);
}
// This command is sent as-is to the shards.
@@ -219,7 +225,7 @@ void DocumentSourceGeoNear::parseOptions(BSONObj options) {
DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx),
coordsIsArray(false),
- limit(100),
+ limit(DocumentSourceGeoNear::kDefaultLimit),
maxDistance(-1.0),
minDistance(-1.0),
spherical(false),
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index b7e09785734..401afc945ff 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -49,17 +49,18 @@ const char* DocumentSourceLimit::getSourceName() const {
return "$limit";
}
-bool DocumentSourceLimit::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
- DocumentSourceLimit* pLimit = dynamic_cast<DocumentSourceLimit*>(pNextSource.get());
+Pipeline::SourceContainer::iterator DocumentSourceLimit::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
- /* if it's not another $limit, we can't coalesce */
- if (!pLimit)
- return false;
+ auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
- /* we need to limit by the minimum of the two limits */
- if (pLimit->limit < limit)
- limit = pLimit->limit;
- return true;
+ if (nextLimit) {
+ limit = std::min(limit, nextLimit->limit);
+ container->erase(std::next(itr));
+ return itr;
+ }
+ return std::next(itr);
}
boost::optional<Document> DocumentSourceLimit::getNext() {
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 83708c8756c..f9927de1e6f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -92,18 +92,21 @@ boost::optional<Document> DocumentSourceLookUp::getNext() {
return output.freeze();
}
-bool DocumentSourceLookUp::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
- if (_handlingUnwind) {
- return false;
- }
-
- auto unwindSrc = dynamic_cast<DocumentSourceUnwind*>(pNextSource.get());
- if (!unwindSrc || unwindSrc->getUnwindPath() != _as.getPath(false)) {
- return false;
+Pipeline::SourceContainer::iterator DocumentSourceLookUp::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get());
+
+ // If we are not already handling an $unwind stage internally, we can combine with the
+ // following $unwind stage.
+ if (nextUnwind && !_handlingUnwind && nextUnwind->getUnwindPath() == _as.getPath(false)) {
+ _unwindSrc = std::move(nextUnwind);
+ _handlingUnwind = true;
+ container->erase(std::next(itr));
+ return itr;
}
- _unwindSrc = std::move(unwindSrc);
- _handlingUnwind = true;
- return true;
+ return std::next(itr);
}
void DocumentSourceLookUp::dispose() {
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index 0458a7474be..f44c3f9dca3 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -74,32 +74,22 @@ boost::optional<Document> DocumentSourceMatch::getNext() {
return boost::none;
}
-bool DocumentSourceMatch::coalesce(const intrusive_ptr<DocumentSource>& nextSource) {
- DocumentSourceMatch* otherMatch = dynamic_cast<DocumentSourceMatch*>(nextSource.get());
- if (!otherMatch)
- return false;
-
- if (otherMatch->_isTextQuery) {
- // Non-initial text queries are disallowed (enforced by setSource below). This prevents
- // "hiding" a non-initial text query by combining it with another match.
- return false;
-
- // The rest of this block is for once we support non-initial text queries.
-
- if (_isTextQuery) {
- // The score should only come from the last $match. We can't combine since then this
- // match's score would impact otherMatch's.
- return false;
- }
-
- _isTextQuery = true;
+Pipeline::SourceContainer::iterator DocumentSourceMatch::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
+
+ // Since a text search must use an index, it must be the first stage in the pipeline. We cannot
+ // combine a non-text stage with a text stage, as that may turn an invalid pipeline into a
+ // valid one, unbeknownst to the user.
+ if (nextMatch && !nextMatch->_isTextQuery) {
+ matcher.reset(new Matcher(BSON("$and" << BSON_ARRAY(getQuery() << nextMatch->getQuery())),
+ ExtensionsCallbackNoop()));
+ container->erase(std::next(itr));
+ return itr;
}
-
- // Replace our matcher with the $and of ours and theirs.
- matcher.reset(new Matcher(BSON("$and" << BSON_ARRAY(getQuery() << otherMatch->getQuery())),
- ExtensionsCallbackNoop()));
-
- return true;
+ return std::next(itr);
}
namespace {
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index a0a81df8610..462621bc79e 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -83,6 +83,22 @@ intrusive_ptr<DocumentSource> DocumentSourceProject::optimize() {
return this;
}
+Pipeline::SourceContainer::iterator DocumentSourceProject::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextSkip = dynamic_cast<DocumentSourceSkip*>((*std::next(itr)).get());
+ auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
+
+ if (nextSkip || nextLimit) {
+ // Swap the $limit/$skip before ourselves, thus reducing the number of documents that
+ // pass through the $project.
+ std::swap(*itr, *std::next(itr));
+ return itr == container->begin() ? itr : std::prev(itr);
+ }
+ return std::next(itr);
+}
+
Value DocumentSourceProject::serialize(bool explain) const {
return Value(DOC(getSourceName() << pEO->serialize(explain)));
}
diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp
index 7067d27930a..b1d2c9e54f2 100644
--- a/src/mongo/db/pipeline/document_source_redact.cpp
+++ b/src/mongo/db/pipeline/document_source_redact.cpp
@@ -68,6 +68,32 @@ boost::optional<Document> DocumentSourceRedact::getNext() {
return boost::none;
}
+Pipeline::SourceContainer::iterator DocumentSourceRedact::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
+
+ if (nextMatch) {
+ const BSONObj redactSafePortion = nextMatch->redactSafePortion();
+
+ if (!redactSafePortion.isEmpty()) {
+ // Because R-M turns into M-R-M without modifying the original $match, we cannot step
+ // backwards and optimize from before the $redact, otherwise this will just loop and
+ // create an infinite number of $matches.
+ Pipeline::SourceContainer::iterator returnItr = std::next(itr);
+
+ container->insert(
+ itr,
+ DocumentSourceMatch::createFromBson(
+ BSON("$match" << redactSafePortion).firstElement(), this->pExpCtx));
+
+ return returnItr;
+ }
+ }
+ return std::next(itr);
+}
+
Value DocumentSourceRedact::redactValue(const Value& in) {
const BSONType valueType = in.getType();
if (valueType == Object) {
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index ff2e5d37161..4b3d4ec168d 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -48,18 +48,6 @@ const char* DocumentSourceSkip::getSourceName() const {
return "$skip";
}
-bool DocumentSourceSkip::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
- DocumentSourceSkip* pSkip = dynamic_cast<DocumentSourceSkip*>(pNextSource.get());
-
- /* if it's not another $skip, we can't coalesce */
- if (!pSkip)
- return false;
-
- /* we need to skip over the sum of the two consecutive $skips */
- _skip += pSkip->_skip;
- return true;
-}
-
boost::optional<Document> DocumentSourceSkip::getNext() {
pExpCtx->checkForInterrupt();
@@ -82,6 +70,27 @@ intrusive_ptr<DocumentSource> DocumentSourceSkip::optimize() {
return _skip == 0 ? nullptr : this;
}
+Pipeline::SourceContainer::iterator DocumentSourceSkip::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
+ auto nextSkip = dynamic_cast<DocumentSourceSkip*>((*std::next(itr)).get());
+
+ if (nextLimit) {
+ // Swap the $limit before this stage, allowing a top-k sort to be possible, provided there
+ // is a $sort stage.
+ nextLimit->setLimit(nextLimit->getLimit() + _skip);
+ std::swap(*itr, *std::next(itr));
+ return itr == container->begin() ? itr : std::prev(itr);
+ } else if (nextSkip) {
+ _skip += nextSkip->getSkip();
+ container->erase(std::next(itr));
+ return itr;
+ }
+ return std::next(itr);
+}
+
intrusive_ptr<DocumentSourceSkip> DocumentSourceSkip::create(
const intrusive_ptr<ExpressionContext>& pExpCtx) {
intrusive_ptr<DocumentSourceSkip> pSource(new DocumentSourceSkip(pExpCtx));
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index df128d4472e..af32cde08d9 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -101,15 +101,6 @@ long long DocumentSourceSort::getLimit() const {
return limitSrc ? limitSrc->getLimit() : -1;
}
-bool DocumentSourceSort::coalesce(const intrusive_ptr<DocumentSource>& pNextSource) {
- if (!limitSrc) {
- limitSrc = dynamic_cast<DocumentSourceLimit*>(pNextSource.get());
- return limitSrc.get(); // false if next is not a $limit
- } else {
- return limitSrc->coalesce(pNextSource);
- }
-}
-
void DocumentSourceSort::addKey(const string& fieldPath, bool ascending) {
VariablesIdGenerator idGenerator;
VariablesParseState vps(&idGenerator);
@@ -139,6 +130,27 @@ Document DocumentSourceSort::serializeSortKey(bool explain) const {
return keyObj.freeze();
}
+Pipeline::SourceContainer::iterator DocumentSourceSort::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
+ auto nextLimit = dynamic_cast<DocumentSourceLimit*>((*std::next(itr)).get());
+
+ if (nextLimit) {
+ // If the following stage is a $limit, we can combine it with ourselves.
+ setLimitSrc(nextLimit);
+ container->erase(std::next(itr));
+ return itr;
+ } else if (nextMatch && !nextMatch->isTextQuery()) {
+ // Swap the $match before the $sort, thus reducing the number of documents that pass into
+ // this stage.
+ std::swap(*itr, *std::next(itr));
+ return itr == container->begin() ? itr : std::prev(itr);
+ }
+ return std::next(itr);
+}
+
DocumentSource::GetDepsReturn DocumentSourceSort::getDependencies(DepsTracker* deps) const {
for (size_t i = 0; i < vSortKey.size(); ++i) {
vSortKey[i]->addDependencies(deps);
@@ -201,9 +213,7 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
uassert(15976, "$sort stage must have at least one sort key", !pSort->vSortKey.empty());
if (limit > 0) {
- bool coalesced = pSort->coalesce(DocumentSourceLimit::create(pExpCtx, limit));
- verify(coalesced); // should always coalesce
- verify(pSort->getLimit() == limit);
+ pSort->setLimitSrc(DocumentSourceLimit::create(pExpCtx, limit));
}
return pSort;
diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp
index bdf9ac79a39..c34cbe6d457 100644
--- a/src/mongo/db/pipeline/document_source_test.cpp
+++ b/src/mongo/db/pipeline/document_source_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/stdx/memory.h"
@@ -215,6 +216,46 @@ TEST(Mock, Empty) {
} // namespace Mock
+namespace DocumentSourceRedact {
+using mongo::DocumentSourceRedact;
+using mongo::DocumentSourceMatch;
+using mongo::DocumentSourceMock;
+
+class Base : public Mock::Base {
+protected:
+ void createRedact() {
+ BSONObj spec = BSON("$redact"
+ << "$$PRUNE");
+ _redact = DocumentSourceRedact::createFromBson(spec.firstElement(), ctx());
+ }
+
+ DocumentSource* redact() {
+ return _redact.get();
+ }
+
+private:
+ intrusive_ptr<DocumentSource> _redact;
+};
+
+class PromoteMatch : public Base {
+public:
+ void run() {
+ createRedact();
+
+ auto match = DocumentSourceMatch::createFromBson(BSON("a" << 1).firstElement(), ctx());
+
+ Pipeline::SourceContainer pipeline;
+ pipeline.push_back(redact());
+ pipeline.push_back(match);
+
+ pipeline.front()->optimizeAt(pipeline.begin(), &pipeline);
+
+ ASSERT_EQUALS(pipeline.size(), 4);
+ ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline.front().get()));
+ }
+};
+} // namespace DocumentSourceRedact
+
namespace DocumentSourceLimit {
using mongo::DocumentSourceLimit;
@@ -251,6 +292,25 @@ public:
}
};
+/** Combine two $limit stages. */
+class CombineLimit : public Base {
+public:
+ void run() {
+ Pipeline::SourceContainer container;
+ createLimit(10);
+
+ auto secondLimit =
+ DocumentSourceLimit::createFromBson(BSON("$limit" << 5).firstElement(), ctx());
+
+ container.push_back(limit());
+ container.push_back(secondLimit);
+
+ limit()->optimizeAt(container.begin(), &container);
+ ASSERT_EQUALS(5, static_cast<DocumentSourceLimit*>(limit())->getLimit());
+ ASSERT_EQUALS(1, container.size());
+ }
+};
+
/** Exhausting a DocumentSourceLimit disposes of the pipeline's source. */
class DisposeSourceCascade : public Base {
public:
@@ -1402,6 +1462,9 @@ public:
createSort(BSON("a" << 1));
ASSERT_EQUALS(sort()->getLimit(), -1);
+ Pipeline::SourceContainer container;
+ container.push_back(sort());
+
{ // pre-limit checks
vector<Value> arr;
sort()->serializeToArray(arr);
@@ -1411,12 +1474,22 @@ public:
ASSERT(sort()->getMergeSource() != NULL);
}
- ASSERT_TRUE(sort()->coalesce(mkLimit(10)));
+ container.push_back(mkLimit(10));
+ sort()->optimizeAt(container.begin(), &container);
+ ASSERT_EQUALS(container.size(), 1);
ASSERT_EQUALS(sort()->getLimit(), 10);
- ASSERT_TRUE(sort()->coalesce(mkLimit(15)));
- ASSERT_EQUALS(sort()->getLimit(), 10); // unchanged
- ASSERT_TRUE(sort()->coalesce(mkLimit(5)));
- ASSERT_EQUALS(sort()->getLimit(), 5); // reduced
+
+ // unchanged
+ container.push_back(mkLimit(15));
+ sort()->optimizeAt(container.begin(), &container);
+ ASSERT_EQUALS(container.size(), 1);
+ ASSERT_EQUALS(sort()->getLimit(), 10);
+
+ // reduced
+ container.push_back(mkLimit(5));
+ sort()->optimizeAt(container.begin(), &container);
+ ASSERT_EQUALS(container.size(), 1);
+ ASSERT_EQUALS(sort()->getLimit(), 5);
vector<Value> arr;
sort()->serializeToArray(arr);
@@ -2477,15 +2550,27 @@ public:
void run() {
intrusive_ptr<DocumentSourceGeoNear> geoNear = DocumentSourceGeoNear::create(ctx());
- ASSERT_EQUALS(geoNear->getLimit(), 100);
+ Pipeline::SourceContainer container;
+ container.push_back(geoNear);
+
+ ASSERT_EQUALS(geoNear->getLimit(), DocumentSourceGeoNear::kDefaultLimit);
+
+ container.push_back(DocumentSourceLimit::create(ctx(), 200));
+ geoNear->optimizeAt(container.begin(), &container);
- ASSERT(geoNear->coalesce(DocumentSourceLimit::create(ctx(), 200)));
- ASSERT_EQUALS(geoNear->getLimit(), 100);
+ ASSERT_EQUALS(container.size(), 1);
+ ASSERT_EQUALS(geoNear->getLimit(), DocumentSourceGeoNear::kDefaultLimit);
- ASSERT(geoNear->coalesce(DocumentSourceLimit::create(ctx(), 50)));
+ container.push_back(DocumentSourceLimit::create(ctx(), 50));
+ geoNear->optimizeAt(container.begin(), &container);
+
+ ASSERT_EQUALS(container.size(), 1);
ASSERT_EQUALS(geoNear->getLimit(), 50);
- ASSERT(geoNear->coalesce(DocumentSourceLimit::create(ctx(), 30)));
+ container.push_back(DocumentSourceLimit::create(ctx(), 30));
+ geoNear->optimizeAt(container.begin(), &container);
+
+ ASSERT_EQUALS(container.size(), 1);
ASSERT_EQUALS(geoNear->getLimit(), 30);
}
};
@@ -2644,15 +2729,23 @@ public:
intrusive_ptr<DocumentSourceMatch> match2 = makeMatch(BSON("b" << 1));
intrusive_ptr<DocumentSourceMatch> match3 = makeMatch(BSON("c" << 1));
+ Pipeline::SourceContainer container;
+
// Check initial state
ASSERT_EQUALS(match1->getQuery(), BSON("a" << 1));
ASSERT_EQUALS(match2->getQuery(), BSON("b" << 1));
ASSERT_EQUALS(match3->getQuery(), BSON("c" << 1));
- ASSERT(match1->coalesce(match2));
+ container.push_back(match1);
+ container.push_back(match2);
+ match1->optimizeAt(container.begin(), &container);
+
+ ASSERT_EQUALS(container.size(), 1);
ASSERT_EQUALS(match1->getQuery(), fromjson("{'$and': [{a:1}, {b:1}]}"));
- ASSERT(match1->coalesce(match3));
+ container.push_back(match3);
+ match1->optimizeAt(container.begin(), &container);
+ ASSERT_EQUALS(container.size(), 1);
ASSERT_EQUALS(match1->getQuery(),
fromjson(
"{'$and': [{'$and': [{a:1}, {b:1}]},"
@@ -2668,6 +2761,7 @@ public:
add<DocumentSourceClass::Deps>();
add<DocumentSourceLimit::DisposeSource>();
+ add<DocumentSourceLimit::CombineLimit>();
add<DocumentSourceLimit::DisposeSourceCascade>();
add<DocumentSourceLimit::Dependencies>();
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index e237f35a61e..e4a9e7d1e1d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -173,151 +173,28 @@ intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg,
}
}
- // The order in which optimizations are applied can have significant impact on the
- // efficiency of the final pipeline. Be Careful!
- Optimizations::Local::moveMatchBeforeSort(pPipeline.get());
- Optimizations::Local::moveSkipAndLimitBeforeProject(pPipeline.get());
- Optimizations::Local::moveLimitBeforeSkip(pPipeline.get());
- Optimizations::Local::coalesceAdjacent(pPipeline.get());
- Optimizations::Local::optimizeEachDocumentSource(pPipeline.get());
- Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get());
+ pPipeline->optimizePipeline();
return pPipeline;
}
-void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) {
- // TODO Keep moving matches across multiple sorts as moveLimitBeforeSkip does below.
- // TODO Check sort for limit. Not an issue currently due to order optimizations are applied,
- // but should be fixed.
- SourceContainer& sources = pipeline->sources;
- for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) {
- intrusive_ptr<DocumentSource>& pSource = sources[srci];
- DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(pSource.get());
- if (match && !match->isTextQuery()) {
- intrusive_ptr<DocumentSource>& pPrevious = sources[srci - 1];
- if (dynamic_cast<DocumentSourceSort*>(pPrevious.get())) {
- /* swap this item with the previous */
- intrusive_ptr<DocumentSource> pTemp(pPrevious);
- pPrevious = pSource;
- pSource = pTemp;
- }
- }
- }
-}
-
-void Pipeline::Optimizations::Local::moveSkipAndLimitBeforeProject(Pipeline* pipeline) {
- SourceContainer& sources = pipeline->sources;
- if (sources.empty())
- return;
+void Pipeline::optimizePipeline() {
+ SourceContainer optimizedSources;
- for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) {
- // This optimization only applies when a $project comes before a $skip or $limit.
- auto project = dynamic_cast<DocumentSourceProject*>(sources[i - 1].get());
- if (!project)
- continue;
+ SourceContainer::iterator itr = sources.begin();
- auto skip = dynamic_cast<DocumentSourceSkip*>(sources[i].get());
- auto limit = dynamic_cast<DocumentSourceLimit*>(sources[i].get());
- if (!(skip || limit))
- continue;
-
- swap(sources[i], sources[i - 1]);
-
- // Start at back again. This is needed to handle cases with more than 1 $skip or
- // $limit (S means skip, L means limit, P means project)
- //
- // These would work without second pass (assuming back to front ordering)
- // PS -> SP
- // PL -> LP
- // PPL -> LPP
- // PPS -> SPP
- //
- // The following cases need a second pass to handle the second skip or limit
- // PLL -> LLP
- // PPLL -> LLPP
- // PLPL -> LLPP
- i = sources.size(); // decremented before next pass
- }
-}
-
-void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) {
- SourceContainer& sources = pipeline->sources;
- if (sources.empty())
- return;
-
- for (int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) {
- DocumentSourceLimit* limit = dynamic_cast<DocumentSourceLimit*>(sources[i].get());
- DocumentSourceSkip* skip = dynamic_cast<DocumentSourceSkip*>(sources[i - 1].get());
- if (limit && skip) {
- // Increase limit by skip since the skipped docs now pass through the $limit
- limit->setLimit(limit->getLimit() + skip->getSkip());
- swap(sources[i], sources[i - 1]);
-
- // Start at back again. This is needed to handle cases with more than 1 $limit
- // (S means skip, L means limit)
- //
- // These two would work without second pass (assuming back to front ordering)
- // SL -> LS
- // SSL -> LSS
- //
- // The following cases need a second pass to handle the second limit
- // SLL -> LLS
- // SSLL -> LLSS
- // SLSL -> LLSS
- i = sources.size(); // decremented before next pass
- }
+ while (itr != sources.end() && std::next(itr) != sources.end()) {
+ invariant((*itr).get());
+ itr = (*itr).get()->optimizeAt(itr, &sources);
}
-}
-void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) {
- SourceContainer& sources = pipeline->sources;
- if (sources.empty())
- return;
-
- // move all sources to a temporary list
- SourceContainer tempSources;
- sources.swap(tempSources);
-
- // move the first one to the final list
- sources.push_back(tempSources[0]);
-
- // run through the sources, coalescing them or keeping them
- for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) {
- // If we can't coalesce the source with the last, then move it
- // to the final list, and make it the new last. (If we succeeded,
- // then we're still on the same last, and there's no need to move
- // or do anything with the source -- the destruction of tempSources
- // will take care of the rest.)
- intrusive_ptr<DocumentSource>& pLastSource = sources.back();
- intrusive_ptr<DocumentSource>& pTemp = tempSources[tempi];
- verify(pTemp && pLastSource);
- if (!pLastSource->coalesce(pTemp))
- sources.push_back(pTemp);
- }
-}
-
-void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) {
- SourceContainer& sources = pipeline->sources;
- SourceContainer newSources;
- for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) {
- if (auto out = (*it)->optimize()) {
- newSources.push_back(std::move(out));
- }
- }
- pipeline->sources = std::move(newSources);
-}
-
-void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) {
- SourceContainer& sources = pipeline->sources;
- if (sources.size() >= 2 && dynamic_cast<DocumentSourceRedact*>(sources[0].get())) {
- if (DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources[1].get())) {
- const BSONObj redactSafePortion = match->redactSafePortion();
- if (!redactSafePortion.isEmpty()) {
- sources.push_front(DocumentSourceMatch::createFromBson(
- BSON("$match" << redactSafePortion).firstElement(), pipeline->pCtx));
- }
+ // Once we have reached our final number of stages, optimize each individually.
+ for (auto&& source : sources) {
+ if (auto out = source->optimize()) {
+ optimizedSources.push_back(out);
}
}
+ sources.swap(optimizedSources);
}
Status Pipeline::checkAuthForCommand(ClientBasic* client,
@@ -447,12 +324,11 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin
// objects even though only a subset of fields are needed.
// 2) Optimization IS NOT applied immediately following a $project or $group since it would
// add an unnecessary project (and therefore a deep-copy).
- for (size_t i = 0; i < shardPipe->sources.size(); i++) {
- DepsTracker dt; // ignored
- if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
+ for (auto&& source : shardPipe->sources) {
+ DepsTracker dt;
+ if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
return;
}
-
// if we get here, add the project.
shardPipe->sources.push_back(DocumentSourceProject::createFromBson(
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx));
@@ -522,8 +398,7 @@ void Pipeline::stitch() {
/* chain together the sources we found */
DocumentSource* prevSource = sources.front().get();
- for (SourceContainer::iterator iter(sources.begin() + 1), listEnd(sources.end());
- iter != listEnd;
+ for (SourceContainer::iterator iter(++sources.begin()), listEnd(sources.end()); iter != listEnd;
++iter) {
intrusive_ptr<DocumentSource> pTemp(*iter);
pTemp->setSource(prevSource);
@@ -571,9 +446,9 @@ DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const {
DepsTracker deps;
bool knowAllFields = false;
bool knowAllMeta = false;
- for (size_t i = 0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) {
+ for (auto&& source : sources) {
DepsTracker localDeps;
- DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps);
+ DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
if (status == DocumentSource::NOT_SUPPORTED) {
// Assume this stage needs everything. We may still know something about our
@@ -595,6 +470,10 @@ DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const {
knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
}
+
+ if (knowAllMeta && knowAllFields) {
+ break;
+ }
}
if (!knowAllFields)
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 65ff1ad97e9..431933e159e 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -52,6 +52,8 @@ class Privilege;
*/
class Pipeline : public IntrusiveCounterUnsigned {
public:
+ typedef std::list<boost::intrusive_ptr<DocumentSource>> SourceContainer;
+
/**
* Create a pipeline from the command.
*
@@ -64,7 +66,7 @@ public:
const BSONObj& cmdObj,
const boost::intrusive_ptr<ExpressionContext>& pCtx);
- /// Helper to implement Command::checkAuthForCommand
+ // Helper to implement Command::checkAuthForCommand
static Status checkAuthForCommand(ClientBasic* client,
const std::string& dbname,
const BSONObj& cmdObj);
@@ -95,6 +97,11 @@ public:
bool needsPrimaryShardMerger() const;
/**
+ * Modifies the pipeline, optimizing it by combining and swapping stages.
+ */
+ void optimizePipeline();
+
+ /**
* Returns any other collections involved in the pipeline in addition to the collection the
* aggregation is run on.
*/
@@ -189,7 +196,6 @@ private:
Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx);
- typedef std::deque<boost::intrusive_ptr<DocumentSource>> SourceContainer;
SourceContainer sources;
bool explain;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 4cebf691c26..91be4e3a92e 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -221,9 +221,9 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
Pipeline::SourceContainer& sources = pPipeline->sources;
// Inject a MongodImplementation to sources that need them.
- for (size_t i = 0; i < sources.size(); i++) {
+ for (auto&& source : sources) {
DocumentSourceNeedsMongod* needsMongod =
- dynamic_cast<DocumentSourceNeedsMongod*>(sources[i].get());
+ dynamic_cast<DocumentSourceNeedsMongod*>(source.get());
if (needsMongod) {
needsMongod->injectMongodInterface(std::make_shared<MongodImplementation>(pExpCtx));
}
@@ -434,11 +434,10 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline
pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
}
- while (!pipeline->sources.empty() && pSource->coalesce(pipeline->sources.front())) {
- pipeline->sources.pop_front();
- }
-
+ // Add the initial DocumentSourceCursor to the front of the pipeline. Then optimize again in
+ // case the new stage can be absorbed with the first stages of the pipeline.
pipeline->addInitialSource(pSource);
+ pipeline->optimizePipeline();
// DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
// deregister the PlanExecutor so that it can be registered with ClientCursor.
diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h
index 68763c2ac5c..af85fdff4b0 100644
--- a/src/mongo/db/pipeline/pipeline_optimizations.h
+++ b/src/mongo/db/pipeline/pipeline_optimizations.h
@@ -37,67 +37,6 @@
namespace mongo {
/**
- * This class holds optimizations applied to a single Pipeline.
- *
- * Each function has the same signature and takes a Pipeline as an in/out parameter.
- */
-class Pipeline::Optimizations::Local {
-public:
- /**
- * Moves matches before any adjacent sort phases.
- *
- * This means we sort fewer items. Neither sorts, nor matches (excluding $text)
- * change the documents in the stream, so this transformation shouldn't affect
- * the result.
- */
- static void moveMatchBeforeSort(Pipeline* pipeline);
-
- /**
- * Moves skip and limit before any adjacent project phases.
- *
- * While this is performance-neutral on its own, it enables other optimizations
- * such as combining sort and limit.
- */
- static void moveSkipAndLimitBeforeProject(Pipeline* pipeline);
-
- /**
- * Moves limits before any adjacent skip phases.
- *
- * This is more optimal for sharding since currently, we can only split
- * the pipeline at a single source and it is better to limit the results
- * coming from each shard. This also enables other optimizations like
- * coalescing the limit into a sort.
- */
- static void moveLimitBeforeSkip(Pipeline* pipeline);
-
- /**
- * Runs through the DocumentSources, and give each one the opportunity
- * to coalesce with its successor. If successful, remove the successor.
- *
- * This should generally be run after optimizations that reorder stages
- * to be most effective.
- *
- * NOTE: uses the DocumentSource::coalesce() method
- */
- static void coalesceAdjacent(Pipeline* pipeline);
-
- /**
- * Gives each DocumentSource the opportunity to optimize itself.
- *
- * NOTE: uses the DocumentSource::optimize() method
- */
- static void optimizeEachDocumentSource(Pipeline* pipeline);
-
- /**
- * Optimizes [$redact, $match] to [$match, $redact, $match] if possible.
- *
- * This gives us the ability to use indexes and reduce the number of
- * BSONObjs converted to Documents.
- */
- static void duplicateMatchBeforeInitalRedact(Pipeline* pipeline);
-};
-
-/**
* This class holds optimizations applied to a shard Pipeline and a merger Pipeline.
*
* Each function has the same signature and takes two Pipelines, both as an in/out parameters.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 333fc5963de..e936b895cca 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -99,7 +99,7 @@ class MoveLimitBeforeProject : public Base {
}
};
-class MoveMulitipleSkipsAndLimitsBeforeProject : public Base {
+class MoveMultipleSkipsAndLimitsBeforeProject : public Base {
string inputPipeJson() override {
return "[{$project: {a : 1}}, {$limit : 5}, {$skip : 3}]";
}
@@ -109,6 +109,21 @@ class MoveMulitipleSkipsAndLimitsBeforeProject : public Base {
}
};
+class SkipSkipLimitBecomesLimitSkip : public Base {
+ string inputPipeJson() override {
+ return "[{$skip : 3}"
+ ",{$skip : 5}"
+ ",{$limit: 5}"
+ "]";
+ }
+
+ string outputPipeJson() override {
+ return "[{$limit: 13}"
+ ",{$skip : 8}"
+ "]";
+ }
+};
+
class SortMatchProjSkipLimBecomesMatchTopKSortSkipProj : public Base {
string inputPipeJson() override {
return "[{$sort: {a: 1}}"
@@ -179,6 +194,16 @@ class DoNotRemoveNonEmptyMatch : public Base {
}
};
+class MoveMatchBeforeSort : public Base {
+ string inputPipeJson() override {
+ return "[{$sort: {b: 1}}, {$match: {a: 2}}]";
+ }
+
+ string outputPipeJson() override {
+ return "[{$match: {a: 2}}, {$sort: {sortKey: {b: 1}}}]";
+ }
+};
+
class LookupShouldCoalesceWithUnwindOnAs : public Base {
string inputPipeJson() {
return "[{$lookup: {from : 'coll2', as : 'same', localField: 'left', foreignField: "
@@ -234,6 +259,15 @@ class LookupShouldNotCoalesceWithUnwindNotOnAs : public Base {
}
};
+class MatchShouldDuplicateItselfBeforeRedact : public Base {
+ string inputPipeJson() {
+ return "[{$redact: '$$PRUNE'}, {$match: {a: 1, b:12}}]";
+ }
+ string outputPipeJson() {
+ return "[{$match: {a: 1, b:12}}, {$redact: '$$PRUNE'}, {$match: {a: 1, b:12}}]";
+ }
+};
+
} // namespace Local
namespace Sharded {
@@ -600,16 +634,19 @@ public:
add<Optimizations::Local::RemoveSkipZero>();
add<Optimizations::Local::MoveLimitBeforeProject>();
add<Optimizations::Local::MoveSkipBeforeProject>();
- add<Optimizations::Local::MoveMulitipleSkipsAndLimitsBeforeProject>();
+ add<Optimizations::Local::MoveMultipleSkipsAndLimitsBeforeProject>();
+ add<Optimizations::Local::SkipSkipLimitBecomesLimitSkip>();
add<Optimizations::Local::SortMatchProjSkipLimBecomesMatchTopKSortSkipProj>();
add<Optimizations::Local::DoNotRemoveSkipOne>();
add<Optimizations::Local::RemoveEmptyMatch>();
add<Optimizations::Local::RemoveMultipleEmptyMatches>();
+ add<Optimizations::Local::MoveMatchBeforeSort>();
add<Optimizations::Local::DoNotRemoveNonEmptyMatch>();
add<Optimizations::Local::LookupShouldCoalesceWithUnwindOnAs>();
add<Optimizations::Local::LookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty>();
add<Optimizations::Local::LookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex>();
add<Optimizations::Local::LookupShouldNotCoalesceWithUnwindNotOnAs>();
+ add<Optimizations::Local::MatchShouldDuplicateItselfBeforeRedact>();
add<Optimizations::Sharded::Empty>();
add<Optimizations::Sharded::coalesceLookUpAndUnwind::ShouldCoalesceUnwindOnAs>();
add<Optimizations::Sharded::coalesceLookUpAndUnwind::