summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source.h')
-rw-r--r--src/mongo/db/pipeline/document_source.h1768
1 files changed, 894 insertions, 874 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 51f5ae7c3b3..4902f8b4a40 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -51,1062 +51,1082 @@
namespace mongo {
- class Accumulator;
- class Document;
- class Expression;
- class ExpressionFieldPath;
- class ExpressionObject;
- class DocumentSourceLimit;
- class PlanExecutor;
-
- class DocumentSource : public IntrusiveCounterUnsigned {
- public:
- virtual ~DocumentSource() {}
-
- /** Returns the next Document if there is one or boost::none if at EOF.
- * Subclasses must call pExpCtx->checkForInterupt().
- */
- virtual boost::optional<Document> getNext() = 0;
-
- /**
- * Inform the source that it is no longer needed and may release its resources. After
- * dispose() is called the source must still be able to handle iteration requests, but may
- * become eof().
- * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will
- * not be advanced until eof(), see SERVER-6123.
- */
- virtual void dispose();
-
- /**
- Get the source's name.
+class Accumulator;
+class Document;
+class Expression;
+class ExpressionFieldPath;
+class ExpressionObject;
+class DocumentSourceLimit;
+class PlanExecutor;
+
+class DocumentSource : public IntrusiveCounterUnsigned {
+public:
+ virtual ~DocumentSource() {}
+
+ /** Returns the next Document if there is one or boost::none if at EOF.
+ * Subclasses must call pExpCtx->checkForInterupt().
+ */
+ virtual boost::optional<Document> getNext() = 0;
- @returns the std::string name of the source as a constant string;
- this is static, and there's no need to worry about adopting it
- */
- virtual const char *getSourceName() const;
+ /**
+ * Inform the source that it is no longer needed and may release its resources. After
+ * dispose() is called the source must still be able to handle iteration requests, but may
+ * become eof().
+ * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will
+ * not be advanced until eof(), see SERVER-6123.
+ */
+ virtual void dispose();
- /**
- Set the underlying source this source should use to get Documents
- from.
+ /**
+ Get the source's name.
- It is an error to set the source more than once. This is to
- prevent changing sources once the original source has been started;
- this could break the state maintained by the DocumentSource.
+ @returns the std::string name of the source as a constant string;
+ this is static, and there's no need to worry about adopting it
+ */
+ virtual const char* getSourceName() const;
- This pointer is not reference counted because that has led to
- some circular references. As a result, this doesn't keep
- sources alive, and is only intended to be used temporarily for
- the lifetime of a Pipeline::run().
+ /**
+ Set the underlying source this source should use to get Documents
+ from.
- @param pSource the underlying source to use
- */
- virtual void setSource(DocumentSource *pSource);
+ It is an error to set the source more than once. This is to
+ prevent changing sources once the original source has been started;
+ this could break the state maintained by the DocumentSource.
- /**
- Attempt to coalesce this DocumentSource with its successor in the
- document processing pipeline. If successful, the successor
- DocumentSource should be removed from the pipeline and discarded.
+ This pointer is not reference counted because that has led to
+ some circular references. As a result, this doesn't keep
+ sources alive, and is only intended to be used temporarily for
+ the lifetime of a Pipeline::run().
- If successful, this operation can be applied repeatedly, in an
- attempt to coalesce several sources together.
+ @param pSource the underlying source to use
+ */
+ virtual void setSource(DocumentSource* pSource);
- The default implementation is to do nothing, and return false.
+ /**
+ Attempt to coalesce this DocumentSource with its successor in the
+ document processing pipeline. If successful, the successor
+ DocumentSource should be removed from the pipeline and discarded.
- @param pNextSource the next source in the document processing chain.
- @returns whether or not the attempt to coalesce was successful or not;
- if the attempt was not successful, nothing has been changed
- */
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource> &pNextSource);
+ If successful, this operation can be applied repeatedly, in an
+ attempt to coalesce several sources together.
- /**
- * Returns an optimized DocumentSource that is semantically equivalent to this one, or
- * nullptr if this stage is a no-op. Implementations are allowed to modify themselves
- * in-place and return a pointer to themselves. For best results, first coalesce compatible
- * sources using coalesce().
- *
- * This is intended for any operations that include expressions, and provides a hook for
- * those to optimize those operations.
- *
- * The default implementation is to do nothing and return yourself.
- */
- virtual boost::intrusive_ptr<DocumentSource> optimize();
+ The default implementation is to do nothing, and return false.
- enum GetDepsReturn {
- NOT_SUPPORTED = 0x0, // The full object and all metadata may be required
- SEE_NEXT = 0x1, // Later stages could need either fields or metadata
- EXHAUSTIVE_FIELDS = 0x2, // Later stages won't need more fields from input
- EXHAUSTIVE_META = 0x4, // Later stages won't need more metadata from input
- EXHAUSTIVE_ALL = EXHAUSTIVE_FIELDS | EXHAUSTIVE_META, // Later stages won't need either
- };
+ @param pNextSource the next source in the document processing chain.
+ @returns whether or not the attempt to coalesce was successful or not;
+ if the attempt was not successful, nothing has been changed
+ */
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource);
- /**
- * Get the dependencies this operation needs to do its job.
- */
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
- return NOT_SUPPORTED;
- }
+ /**
+ * Returns an optimized DocumentSource that is semantically equivalent to this one, or
+ * nullptr if this stage is a no-op. Implementations are allowed to modify themselves
+ * in-place and return a pointer to themselves. For best results, first coalesce compatible
+ * sources using coalesce().
+ *
+ * This is intended for any operations that include expressions, and provides a hook for
+ * those to optimize those operations.
+ *
+ * The default implementation is to do nothing and return yourself.
+ */
+ virtual boost::intrusive_ptr<DocumentSource> optimize();
+
+ enum GetDepsReturn {
+ NOT_SUPPORTED = 0x0, // The full object and all metadata may be required
+ SEE_NEXT = 0x1, // Later stages could need either fields or metadata
+ EXHAUSTIVE_FIELDS = 0x2, // Later stages won't need more fields from input
+ EXHAUSTIVE_META = 0x4, // Later stages won't need more metadata from input
+ EXHAUSTIVE_ALL = EXHAUSTIVE_FIELDS | EXHAUSTIVE_META, // Later stages won't need either
+ };
- /**
- * In the default case, serializes the DocumentSource and adds it to the std::vector<Value>.
- *
- * A subclass may choose to overwrite this, rather than serialize,
- * if it should output multiple stages (eg, $sort sometimes also outputs a $limit).
- */
+ /**
+ * Get the dependencies this operation needs to do its job.
+ */
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
+ return NOT_SUPPORTED;
+ }
- virtual void serializeToArray(std::vector<Value>& array, bool explain = false) const;
+ /**
+ * In the default case, serializes the DocumentSource and adds it to the std::vector<Value>.
+ *
+ * A subclass may choose to overwrite this, rather than serialize,
+ * if it should output multiple stages (eg, $sort sometimes also outputs a $limit).
+ */
- /// Returns true if doesn't require an input source (most DocumentSources do).
- virtual bool isValidInitialSource() const { return false; }
+ virtual void serializeToArray(std::vector<Value>& array, bool explain = false) const;
- protected:
- /**
- Base constructor.
- */
- DocumentSource(const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /// Returns true if doesn't require an input source (most DocumentSources do).
+ virtual bool isValidInitialSource() const {
+ return false;
+ }
- /*
- Most DocumentSources have an underlying source they get their data
- from. This is a convenience for them.
+protected:
+ /**
+ Base constructor.
+ */
+ DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- The default implementation of setSource() sets this; if you don't
- need a source, override that to verify(). The default is to
- verify() if this has already been set.
- */
- DocumentSource *pSource;
+ /*
+ Most DocumentSources have an underlying source they get their data
+ from. This is a convenience for them.
- boost::intrusive_ptr<ExpressionContext> pExpCtx;
+ The default implementation of setSource() sets this; if you don't
+ need a source, override that to verify(). The default is to
+ verify() if this has already been set.
+ */
+ DocumentSource* pSource;
- private:
- /**
- * Create a Value that represents the document source.
- *
- * This is used by the default implementation of serializeToArray() to add this object
- * to a pipeline being serialized. Returning a missing() Value results in no entry
- * being added to the array for this stage (DocumentSource).
- */
- virtual Value serialize(bool explain = false) const = 0;
- };
+ boost::intrusive_ptr<ExpressionContext> pExpCtx;
- /** This class marks DocumentSources that should be split between the merger and the shards.
- * See Pipeline::Optimizations::Sharded::findSplitPoint() for details.
+private:
+ /**
+ * Create a Value that represents the document source.
+ *
+ * This is used by the default implementation of serializeToArray() to add this object
+ * to a pipeline being serialized. Returning a missing() Value results in no entry
+ * being added to the array for this stage (DocumentSource).
*/
- class SplittableDocumentSource {
- public:
- /** returns a source to be run on the shards.
- * if NULL, don't run on shards
- */
- virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
-
- /** returns a source that combines results from shards.
- * if NULL, don't run on merger
- */
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource() = 0;
- protected:
- // It is invalid to delete through a SplittableDocumentSource-typed pointer.
- virtual ~SplittableDocumentSource() {}
- };
+ virtual Value serialize(bool explain = false) const = 0;
+};
-
- /** This class marks DocumentSources which need mongod-specific functionality.
- * It causes a MongodInterface to be injected when in a mongod and prevents mongos from
- * merging pipelines containing this stage.
+/** This class marks DocumentSources that should be split between the merger and the shards.
+ * See Pipeline::Optimizations::Sharded::findSplitPoint() for details.
+ */
+class SplittableDocumentSource {
+public:
+ /** returns a source to be run on the shards.
+ * if NULL, don't run on shards
*/
- class DocumentSourceNeedsMongod {
- public:
- // Wraps mongod-specific functions to allow linking into mongos.
- class MongodInterface {
- public:
- virtual ~MongodInterface() {};
-
- /**
- * Always returns a DBDirectClient.
- * Callers must not cache the returned pointer outside the scope of a single function.
- */
- virtual DBClientBase* directClient() = 0;
-
- // Note that in some rare cases this could return a false negative but will never return
- // a false positive. This method will be fixed in the future once it becomes possible to
- // avoid false negatives.
- virtual bool isSharded(const NamespaceString& ns) = 0;
-
- virtual bool isCapped(const NamespaceString& ns) = 0;
-
- /**
- * Inserts 'objs' into 'ns' and returns the "detailed" last error object.
- */
- virtual BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) = 0;
-
- // Add new methods as needed.
- };
-
- void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) {
- _mongod = mongod;
- }
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
- protected:
- // It is invalid to delete through a DocumentSourceNeedsMongod-typed pointer.
- virtual ~DocumentSourceNeedsMongod() {}
+ /** returns a source that combines results from shards.
+ * if NULL, don't run on merger
+ */
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource() = 0;
- // Gives subclasses access to a MongodInterface implementation
- std::shared_ptr<MongodInterface> _mongod;
- };
+protected:
+ // It is invalid to delete through a SplittableDocumentSource-typed pointer.
+ virtual ~SplittableDocumentSource() {}
+};
- class DocumentSourceBsonArray :
- public DocumentSource {
+/** This class marks DocumentSources which need mongod-specific functionality.
+ * It causes a MongodInterface to be injected when in a mongod and prevents mongos from
+ * merging pipelines containing this stage.
+ */
+class DocumentSourceNeedsMongod {
+public:
+ // Wraps mongod-specific functions to allow linking into mongos.
+ class MongodInterface {
public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual Value serialize(bool explain = false) const;
- virtual void setSource(DocumentSource *pSource);
- virtual bool isValidInitialSource() const { return true; }
+ virtual ~MongodInterface(){};
/**
- Create a document source based on a BSON array.
-
- This is usually put at the beginning of a chain of document sources
- in order to fetch data from the database.
-
- CAUTION: the BSON is not read until the source is used. Any
- elements that appear after these documents must not be read until
- this source is exhausted.
-
- @param array the BSON array to treat as a document source
- @param pExpCtx the expression context for the pipeline
- @returns the newly created document source
- */
- static boost::intrusive_ptr<DocumentSourceBsonArray> create(
- const BSONObj& array,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
-
- private:
- DocumentSourceBsonArray(
- const BSONObj& embeddedArray,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ * Always returns a DBDirectClient.
+ * Callers must not cache the returned pointer outside the scope of a single function.
+ */
+ virtual DBClientBase* directClient() = 0;
- BSONObj embeddedObject;
- BSONObjIterator arrayIterator;
- };
+ // Note that in some rare cases this could return a false negative but will never return
+ // a false positive. This method will be fixed in the future once it becomes possible to
+ // avoid false negatives.
+ virtual bool isSharded(const NamespaceString& ns) = 0;
+ virtual bool isCapped(const NamespaceString& ns) = 0;
- class DocumentSourceCommandShards :
- public DocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual Value serialize(bool explain = false) const;
- virtual void setSource(DocumentSource *pSource);
- virtual bool isValidInitialSource() const { return true; }
-
- /* convenient shorthand for a commonly used type */
- typedef std::vector<Strategy::CommandResult> ShardOutput;
-
- /** Returns the result arrays from shards using the 2.4 protocol.
- * Call this instead of getNext() if you want access to the raw streams.
- * This method should only be called at most once.
+ /**
+ * Inserts 'objs' into 'ns' and returns the "detailed" last error object.
*/
- std::vector<BSONArray> getArrays();
+ virtual BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) = 0;
- /**
- Create a DocumentSource that wraps the output of many shards
+ // Add new methods as needed.
+ };
- @param shardOutput output from the individual shards
- @param pExpCtx the expression context for the pipeline
- @returns the newly created DocumentSource
- */
- static boost::intrusive_ptr<DocumentSourceCommandShards> create(
- const ShardOutput& shardOutput,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) {
+ _mongod = mongod;
+ }
- private:
- DocumentSourceCommandShards(const ShardOutput& shardOutput,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+protected:
+ // It is invalid to delete through a DocumentSourceNeedsMongod-typed pointer.
+ virtual ~DocumentSourceNeedsMongod() {}
- /**
- Advance to the next document, setting pCurrent appropriately.
+ // Gives subclasses access to a MongodInterface implementation
+ std::shared_ptr<MongodInterface> _mongod;
+};
- Adjusts pCurrent, pBsonSource, and iterator, as needed. On exit,
- pCurrent is the Document to return, or NULL. If NULL, this
- indicates there is nothing more to return.
- */
- void getNextDocument();
-
- bool unstarted;
- bool hasCurrent;
- bool newSource; // set to true for the first item of a new source
- boost::intrusive_ptr<DocumentSourceBsonArray> pBsonSource;
- Document pCurrent;
- ShardOutput::const_iterator iterator;
- ShardOutput::const_iterator listEnd;
- };
+class DocumentSourceBsonArray : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual Value serialize(bool explain = false) const;
+ virtual void setSource(DocumentSource* pSource);
+ virtual bool isValidInitialSource() const {
+ return true;
+ }
/**
- * Constructs and returns Documents from the BSONObj objects produced by a supplied
- * PlanExecutor.
- *
- * An object of this type may only be used by one thread, see SERVER-6123.
+ Create a document source based on a BSON array.
+
+ This is usually put at the beginning of a chain of document sources
+ in order to fetch data from the database.
+
+ CAUTION: the BSON is not read until the source is used. Any
+ elements that appear after these documents must not be read until
+ this source is exhausted.
+
+ @param array the BSON array to treat as a document source
+ @param pExpCtx the expression context for the pipeline
+ @returns the newly created document source
+ */
+ static boost::intrusive_ptr<DocumentSourceBsonArray> create(
+ const BSONObj& array, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceBsonArray(const BSONObj& embeddedArray,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ BSONObj embeddedObject;
+ BSONObjIterator arrayIterator;
+};
+
+
+class DocumentSourceCommandShards : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual Value serialize(bool explain = false) const;
+ virtual void setSource(DocumentSource* pSource);
+ virtual bool isValidInitialSource() const {
+ return true;
+ }
+
+ /* convenient shorthand for a commonly used type */
+ typedef std::vector<Strategy::CommandResult> ShardOutput;
+
+ /** Returns the result arrays from shards using the 2.4 protocol.
+ * Call this instead of getNext() if you want access to the raw streams.
+ * This method should only be called at most once.
*/
- class DocumentSourceCursor :
- public DocumentSource {
- public:
- // virtuals from DocumentSource
- virtual ~DocumentSourceCursor();
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual Value serialize(bool explain = false) const;
- virtual void setSource(DocumentSource *pSource);
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& nextSource);
- virtual bool isValidInitialSource() const { return true; }
- virtual void dispose();
+ std::vector<BSONArray> getArrays();
- /**
- * Create a document source based on a passed-in PlanExecutor.
- *
- * This is usually put at the beginning of a chain of document sources
- * in order to fetch data from the database.
- */
- static boost::intrusive_ptr<DocumentSourceCursor> create(
- const std::string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ Create a DocumentSource that wraps the output of many shards
- /*
- Record the query that was specified for the cursor this wraps, if
- any.
+ @param shardOutput output from the individual shards
+ @param pExpCtx the expression context for the pipeline
+ @returns the newly created DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSourceCommandShards> create(
+ const ShardOutput& shardOutput, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- This should be captured after any optimizations are applied to
- the pipeline so that it reflects what is really used.
+private:
+ DocumentSourceCommandShards(const ShardOutput& shardOutput,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- This gets used for explain output.
+ /**
+ Advance to the next document, setting pCurrent appropriately.
- @param pBsonObj the query to record
- */
- void setQuery(const BSONObj& query) { _query = query; }
+ Adjusts pCurrent, pBsonSource, and iterator, as needed. On exit,
+ pCurrent is the Document to return, or NULL. If NULL, this
+ indicates there is nothing more to return.
+ */
+ void getNextDocument();
- /*
- Record the sort that was specified for the cursor this wraps, if
- any.
+ bool unstarted;
+ bool hasCurrent;
+ bool newSource; // set to true for the first item of a new source
+ boost::intrusive_ptr<DocumentSourceBsonArray> pBsonSource;
+ Document pCurrent;
+ ShardOutput::const_iterator iterator;
+ ShardOutput::const_iterator listEnd;
+};
- This should be captured after any optimizations are applied to
- the pipeline so that it reflects what is really used.
- This gets used for explain output.
+/**
+ * Constructs and returns Documents from the BSONObj objects produced by a supplied
+ * PlanExecutor.
+ *
+ * An object of this type may only be used by one thread, see SERVER-6123.
+ */
+class DocumentSourceCursor : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual ~DocumentSourceCursor();
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual Value serialize(bool explain = false) const;
+ virtual void setSource(DocumentSource* pSource);
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& nextSource);
+ virtual bool isValidInitialSource() const {
+ return true;
+ }
+ virtual void dispose();
- @param pBsonObj the sort to record
- */
- void setSort(const BSONObj& sort) { _sort = sort; }
+ /**
+ * Create a document source based on a passed-in PlanExecutor.
+ *
+ * This is usually put at the beginning of a chain of document sources
+ * in order to fetch data from the database.
+ */
+ static boost::intrusive_ptr<DocumentSourceCursor> create(
+ const std::string& ns,
+ const std::shared_ptr<PlanExecutor>& exec,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- /**
- * Informs this object of projection and dependency information.
- *
- * @param projection A projection specification describing the fields needed by the rest of
- * the pipeline.
- * @param deps The output of DepsTracker::toParsedDeps
- */
- void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps);
+ /*
+ Record the query that was specified for the cursor this wraps, if
+ any.
- /// returns -1 for no limit
- long long getLimit() const;
+ This should be captured after any optimizations are applied to
+ the pipeline so that it reflects what is really used.
- private:
- DocumentSourceCursor(
- const std::string& ns,
- const std::shared_ptr<PlanExecutor>& exec,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ This gets used for explain output.
- void loadBatch();
+ @param pBsonObj the query to record
+ */
+ void setQuery(const BSONObj& query) {
+ _query = query;
+ }
- std::deque<Document> _currentBatch;
+ /*
+ Record the sort that was specified for the cursor this wraps, if
+ any.
- // BSONObj members must outlive _projection and cursor.
- BSONObj _query;
- BSONObj _sort;
- BSONObj _projection;
- boost::optional<ParsedDeps> _dependencies;
- boost::intrusive_ptr<DocumentSourceLimit> _limit;
- long long _docsAddedToBatches; // for _limit enforcement
+ This should be captured after any optimizations are applied to
+ the pipeline so that it reflects what is really used.
- const std::string _ns;
- std::shared_ptr<PlanExecutor> _exec; // PipelineProxyStage holds a weak_ptr to this.
- };
+ This gets used for explain output.
+ @param pBsonObj the sort to record
+ */
+ void setSort(const BSONObj& sort) {
+ _sort = sort;
+ }
- class DocumentSourceGroup : public DocumentSource
- , public SplittableDocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual boost::intrusive_ptr<DocumentSource> optimize();
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
- virtual void dispose();
- virtual Value serialize(bool explain = false) const;
+ /**
+ * Informs this object of projection and dependency information.
+ *
+ * @param projection A projection specification describing the fields needed by the rest of
+ * the pipeline.
+ * @param deps The output of DepsTracker::toParsedDeps
+ */
+ void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps);
- static boost::intrusive_ptr<DocumentSourceGroup> create(
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /// returns -1 for no limit
+ long long getLimit() const;
- /**
- Add an accumulator.
+private:
+ DocumentSourceCursor(const std::string& ns,
+ const std::shared_ptr<PlanExecutor>& exec,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- Accumulators become fields in the Documents that result from
- grouping. Each unique group document must have it's own
- accumulator; the accumulator factory is used to create that.
+ void loadBatch();
- @param fieldName the name the accumulator result will have in the
- result documents
- @param pAccumulatorFactory used to create the accumulator for the
- group field
- */
- void addAccumulator(const std::string& fieldName,
- boost::intrusive_ptr<Accumulator> (*pAccumulatorFactory)(),
- const boost::intrusive_ptr<Expression> &pExpression);
+ std::deque<Document> _currentBatch;
- /// Tell this source if it is doing a merge from shards. Defaults to false.
- void setDoingMerge(bool doingMerge) { _doingMerge = doingMerge; }
+ // BSONObj members must outlive _projection and cursor.
+ BSONObj _query;
+ BSONObj _sort;
+ BSONObj _projection;
+ boost::optional<ParsedDeps> _dependencies;
+ boost::intrusive_ptr<DocumentSourceLimit> _limit;
+ long long _docsAddedToBatches; // for _limit enforcement
- /**
- Create a grouping DocumentSource from BSON.
+ const std::string _ns;
+ std::shared_ptr<PlanExecutor> _exec; // PipelineProxyStage holds a weak_ptr to this.
+};
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $group.
- @param pBsonElement the BSONELement that defines the group
- @param pExpCtx the expression context
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+class DocumentSourceGroup : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual boost::intrusive_ptr<DocumentSource> optimize();
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
+ virtual void dispose();
+ virtual Value serialize(bool explain = false) const;
- // Virtuals for SplittableDocumentSource
- virtual boost::intrusive_ptr<DocumentSource> getShardSource();
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource();
+ static boost::intrusive_ptr<DocumentSourceGroup> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- static const char groupName[];
-
- private:
- DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
-
- /// Spill groups map to disk and returns an iterator to the file.
- std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
-
- // Only used by spill. Would be function-local if that were legal in C++03.
- class SpillSTLComparator;
+ /**
+ Add an accumulator.
- /*
- Before returning anything, this source must fetch everything from
- the underlying source and group it. populate() is used to do that
- on the first call to any method on this source. The populated
- boolean indicates that this has been done.
- */
- void populate();
- bool populated;
+ Accumulators become fields in the Documents that result from
+ grouping. Each unique group document must have it's own
+ accumulator; the accumulator factory is used to create that.
- /**
- * Parses the raw id expression into _idExpressions and possibly _idFieldNames.
- */
- void parseIdExpression(BSONElement groupField, const VariablesParseState& vps);
+ @param fieldName the name the accumulator result will have in the
+ result documents
+ @param pAccumulatorFactory used to create the accumulator for the
+ group field
+ */
+ void addAccumulator(const std::string& fieldName,
+ boost::intrusive_ptr<Accumulator>(*pAccumulatorFactory)(),
+ const boost::intrusive_ptr<Expression>& pExpression);
- /**
- * Computes the internal representation of the group key.
- */
- Value computeId(Variables* vars);
+ /// Tell this source if it is doing a merge from shards. Defaults to false.
+ void setDoingMerge(bool doingMerge) {
+ _doingMerge = doingMerge;
+ }
- /**
- * Converts the internal representation of the group key to the _id shape specified by the
- * user.
- */
- Value expandId(const Value& val);
-
-
- typedef std::vector<boost::intrusive_ptr<Accumulator> > Accumulators;
- typedef boost::unordered_map<Value, Accumulators, Value::Hash> GroupsMap;
- GroupsMap groups;
-
- /*
- The field names for the result documents and the accumulator
- factories for the result documents. The Expressions are the
- common expressions used by each instance of each accumulator
- in order to find the right-hand side of what gets added to the
- accumulator. Note that each of those is the same for each group,
- so we can share them across all groups by adding them to the
- accumulators after we use the factories to make a new set of
- accumulators for each new group.
-
- These three vectors parallel each other.
- */
- std::vector<std::string> vFieldName;
- std::vector<boost::intrusive_ptr<Accumulator> (*)()> vpAccumulatorFactory;
- std::vector<boost::intrusive_ptr<Expression> > vpExpression;
-
-
- Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
-
- bool _doingMerge;
- bool _spilled;
- const bool _extSortAllowed;
- const int _maxMemoryUsageBytes;
- std::unique_ptr<Variables> _variables;
- std::vector<std::string> _idFieldNames; // used when id is a document
- std::vector<boost::intrusive_ptr<Expression> > _idExpressions;
-
- // only used when !_spilled
- GroupsMap::iterator groupsIterator;
-
- // only used when _spilled
- std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
- std::pair<Value, Value> _firstPartOfNextGroup;
- Value _currentId;
- Accumulators _currentAccumulators;
- };
+ /**
+ Create a grouping DocumentSource from BSON.
+ This is a convenience method that uses the above, and operates on
+ a BSONElement that has been deteremined to be an Object with an
+ element named $group.
- class DocumentSourceMatch : public DocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& nextSource);
- virtual Value serialize(bool explain = false) const;
- virtual boost::intrusive_ptr<DocumentSource> optimize();
- virtual void setSource(DocumentSource* Source);
+ @param pBsonElement the BSONELement that defines the group
+ @param pExpCtx the expression context
+ @returns the grouping DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- /**
- Create a filter.
+ // Virtuals for SplittableDocumentSource
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource();
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource();
- @param pBsonElement the raw BSON specification for the filter
- @returns the filter
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pCtx);
-
- /// Returns the query in Matcher syntax.
- BSONObj getQuery() const;
-
- static const char matchName[];
-
- /** Returns the portion of the match that can safely be promoted to before a $redact.
- * If this returns an empty BSONObj, no part of this match may safely be promoted.
- *
- * To be safe to promote, removing a field from a document to be matched must not cause
- * that document to be accepted when it would otherwise be rejected. As an example,
- * {name: {$ne: "bob smith"}} accepts documents without a name field, which means that
- * running this filter before a redact that would remove the name field would leak
- * information. On the other hand, {age: {$gt:5}} is ok because it doesn't accept documents
- * that have had their age field removed.
- */
- BSONObj redactSafePortion() const;
+ static const char groupName[];
- static bool isTextQuery(const BSONObj& query);
- bool isTextQuery() const { return _isTextQuery; }
+private:
+ DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- private:
- DocumentSourceMatch(const BSONObj &query,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /// Spill groups map to disk and returns an iterator to the file.
+ std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
- std::unique_ptr<Matcher> matcher;
- bool _isTextQuery;
- };
+ // Only used by spill. Would be function-local if that were legal in C++03.
+ class SpillSTLComparator;
- class DocumentSourceMergeCursors :
- public DocumentSource {
- public:
- typedef std::vector<std::pair<ConnectionString, CursorId> > CursorIds;
+ /*
+ Before returning anything, this source must fetch everything from
+ the underlying source and group it. populate() is used to do that
+ on the first call to any method on this source. The populated
+ boolean indicates that this has been done.
+ */
+ void populate();
+ bool populated;
- // virtuals from DocumentSource
- boost::optional<Document> getNext();
- virtual void setSource(DocumentSource *pSource);
- virtual const char *getSourceName() const;
- virtual void dispose();
- virtual Value serialize(bool explain = false) const;
- virtual bool isValidInitialSource() const { return true; }
+ /**
+ * Parses the raw id expression into _idExpressions and possibly _idFieldNames.
+ */
+ void parseIdExpression(BSONElement groupField, const VariablesParseState& vps);
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ * Computes the internal representation of the group key.
+ */
+ Value computeId(Variables* vars);
- static boost::intrusive_ptr<DocumentSource> create(
- const CursorIds& cursorIds,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ * Converts the internal representation of the group key to the _id shape specified by the
+ * user.
+ */
+ Value expandId(const Value& val);
+
+
+ typedef std::vector<boost::intrusive_ptr<Accumulator>> Accumulators;
+ typedef boost::unordered_map<Value, Accumulators, Value::Hash> GroupsMap;
+ GroupsMap groups;
+
+ /*
+ The field names for the result documents and the accumulator
+ factories for the result documents. The Expressions are the
+ common expressions used by each instance of each accumulator
+ in order to find the right-hand side of what gets added to the
+ accumulator. Note that each of those is the same for each group,
+ so we can share them across all groups by adding them to the
+ accumulators after we use the factories to make a new set of
+ accumulators for each new group.
+
+ These three vectors parallel each other.
+ */
+ std::vector<std::string> vFieldName;
+ std::vector<boost::intrusive_ptr<Accumulator>(*)()> vpAccumulatorFactory;
+ std::vector<boost::intrusive_ptr<Expression>> vpExpression;
+
+
+ Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
+
+ bool _doingMerge;
+ bool _spilled;
+ const bool _extSortAllowed;
+ const int _maxMemoryUsageBytes;
+ std::unique_ptr<Variables> _variables;
+ std::vector<std::string> _idFieldNames; // used when id is a document
+ std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
+
+ // only used when !_spilled
+ GroupsMap::iterator groupsIterator;
+
+ // only used when _spilled
+ std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
+ std::pair<Value, Value> _firstPartOfNextGroup;
+ Value _currentId;
+ Accumulators _currentAccumulators;
+};
+
+
+class DocumentSourceMatch : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& nextSource);
+ virtual Value serialize(bool explain = false) const;
+ virtual boost::intrusive_ptr<DocumentSource> optimize();
+ virtual void setSource(DocumentSource* Source);
- static const char name[];
+ /**
+ Create a filter.
- /** Returns non-owning pointers to cursors managed by this stage.
- * Call this instead of getNext() if you want access to the raw streams.
- * This method should only be called at most once.
- */
- std::vector<DBClientCursor*> getCursors();
+ @param pBsonElement the raw BSON specification for the filter
+ @returns the filter
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx);
- /**
- * Returns the next object from the cursor, throwing an appropriate exception if the cursor
- * reported an error. This is a better form of DBClientCursor::nextSafe.
- */
- static Document nextSafeFrom(DBClientCursor* cursor);
+ /// Returns the query in Matcher syntax.
+ BSONObj getQuery() const;
- private:
+ static const char matchName[];
- struct CursorAndConnection {
- CursorAndConnection(ConnectionString host, NamespaceString ns, CursorId id);
- ScopedDbConnection connection;
- DBClientCursor cursor;
- };
+ /** Returns the portion of the match that can safely be promoted to before a $redact.
+ * If this returns an empty BSONObj, no part of this match may safely be promoted.
+ *
+ * To be safe to promote, removing a field from a document to be matched must not cause
+ * that document to be accepted when it would otherwise be rejected. As an example,
+ * {name: {$ne: "bob smith"}} accepts documents without a name field, which means that
+ * running this filter before a redact that would remove the name field would leak
+ * information. On the other hand, {age: {$gt:5}} is ok because it doesn't accept documents
+ * that have had their age field removed.
+ */
+ BSONObj redactSafePortion() const;
+
+ static bool isTextQuery(const BSONObj& query);
+ bool isTextQuery() const {
+ return _isTextQuery;
+ }
+
+private:
+ DocumentSourceMatch(const BSONObj& query,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ std::unique_ptr<Matcher> matcher;
+ bool _isTextQuery;
+};
+
+class DocumentSourceMergeCursors : public DocumentSource {
+public:
+ typedef std::vector<std::pair<ConnectionString, CursorId>> CursorIds;
+
+ // virtuals from DocumentSource
+ boost::optional<Document> getNext();
+ virtual void setSource(DocumentSource* pSource);
+ virtual const char* getSourceName() const;
+ virtual void dispose();
+ virtual Value serialize(bool explain = false) const;
+ virtual bool isValidInitialSource() const {
+ return true;
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ static boost::intrusive_ptr<DocumentSource> create(
+ const CursorIds& cursorIds, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ static const char name[];
+
+ /** Returns non-owning pointers to cursors managed by this stage.
+ * Call this instead of getNext() if you want access to the raw streams.
+ * This method should only be called at most once.
+ */
+ std::vector<DBClientCursor*> getCursors();
- // using list to enable removing arbitrary elements
- typedef std::list<std::shared_ptr<CursorAndConnection> > Cursors;
+ /**
+ * Returns the next object from the cursor, throwing an appropriate exception if the cursor
+ * reported an error. This is a better form of DBClientCursor::nextSafe.
+ */
+ static Document nextSafeFrom(DBClientCursor* cursor);
- DocumentSourceMergeCursors(
- const CursorIds& cursorIds,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+private:
+ struct CursorAndConnection {
+ CursorAndConnection(ConnectionString host, NamespaceString ns, CursorId id);
+ ScopedDbConnection connection;
+ DBClientCursor cursor;
+ };
- // Converts _cursorIds into active _cursors.
- void start();
+ // using list to enable removing arbitrary elements
+ typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors;
- // This is the description of cursors to merge.
- const CursorIds _cursorIds;
+ DocumentSourceMergeCursors(const CursorIds& cursorIds,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- // These are the actual cursors we are merging. Created lazily.
- Cursors _cursors;
- Cursors::iterator _currentCursor;
+ // Converts _cursorIds into active _cursors.
+ void start();
- bool _unstarted;
- };
+ // This is the description of cursors to merge.
+ const CursorIds _cursorIds;
- class DocumentSourceOut : public DocumentSource
- , public SplittableDocumentSource
- , public DocumentSourceNeedsMongod {
- public:
- // virtuals from DocumentSource
- virtual ~DocumentSourceOut();
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual Value serialize(bool explain = false) const;
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
+ // These are the actual cursors we are merging. Created lazily.
+ Cursors _cursors;
+ Cursors::iterator _currentCursor;
- // Virtuals for SplittableDocumentSource
- virtual boost::intrusive_ptr<DocumentSource> getShardSource() { return NULL; }
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource() { return this; }
+ bool _unstarted;
+};
- const NamespaceString& getOutputNs() const { return _outputNs; }
+class DocumentSourceOut : public DocumentSource,
+ public SplittableDocumentSource,
+ public DocumentSourceNeedsMongod {
+public:
+ // virtuals from DocumentSource
+ virtual ~DocumentSourceOut();
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual Value serialize(bool explain = false) const;
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
- /**
- Create a document source for output and pass-through.
+ // Virtuals for SplittableDocumentSource
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource() {
+ return NULL;
+ }
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource() {
+ return this;
+ }
- This can be put anywhere in a pipeline and will store content as
- well as pass it on.
+ const NamespaceString& getOutputNs() const {
+ return _outputNs;
+ }
- @param pBsonElement the raw BSON specification for the source
- @param pExpCtx the expression context for the pipeline
- @returns the newly created document source
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ Create a document source for output and pass-through.
- static const char outName[];
+ This can be put anywhere in a pipeline and will store content as
+ well as pass it on.
- private:
- DocumentSourceOut(const NamespaceString& outputNs,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ @param pBsonElement the raw BSON specification for the source
+ @param pExpCtx the expression context for the pipeline
+ @returns the newly created document source
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- // Sets _tempsNs and prepares it to receive data.
- void prepTempCollection();
+ static const char outName[];
- void spill(const std::vector<BSONObj>& toInsert);
+private:
+ DocumentSourceOut(const NamespaceString& outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- bool _done;
+ // Sets _tempsNs and prepares it to receive data.
+ void prepTempCollection();
- NamespaceString _tempNs; // output goes here as it is being processed.
- const NamespaceString _outputNs; // output will go here after all data is processed.
- };
+ void spill(const std::vector<BSONObj>& toInsert);
+ bool _done;
- class DocumentSourceProject : public DocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual boost::intrusive_ptr<DocumentSource> optimize();
- virtual Value serialize(bool explain = false) const;
+ NamespaceString _tempNs; // output goes here as it is being processed.
+ const NamespaceString _outputNs; // output will go here after all data is processed.
+};
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
- /**
- Create a new projection DocumentSource from BSON.
+class DocumentSourceProject : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual boost::intrusive_ptr<DocumentSource> optimize();
+ virtual Value serialize(bool explain = false) const;
- This is a convenience for directly handling BSON, and relies on the
- above methods.
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
- @param pBsonElement the BSONElement with an object named $project
- @param pExpCtx the expression context for the pipeline
- @returns the created projection
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ Create a new projection DocumentSource from BSON.
- static const char projectName[];
+ This is a convenience for directly handling BSON, and relies on the
+ above methods.
- /** projection as specified by the user */
- BSONObj getRaw() const { return _raw; }
+ @param pBsonElement the BSONElement with an object named $project
+ @param pExpCtx the expression context for the pipeline
+ @returns the created projection
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- private:
- DocumentSourceProject(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- const boost::intrusive_ptr<ExpressionObject>& exprObj);
+ static const char projectName[];
- // configuration state
- std::unique_ptr<Variables> _variables;
- boost::intrusive_ptr<ExpressionObject> pEO;
- BSONObj _raw;
- };
+ /** projection as specified by the user */
+ BSONObj getRaw() const {
+ return _raw;
+ }
- class DocumentSourceRedact :
- public DocumentSource {
- public:
- virtual boost::optional<Document> getNext();
- virtual const char* getSourceName() const;
- virtual boost::intrusive_ptr<DocumentSource> optimize();
+private:
+ DocumentSourceProject(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const boost::intrusive_ptr<ExpressionObject>& exprObj);
- static const char redactName[];
+ // configuration state
+ std::unique_ptr<Variables> _variables;
+ boost::intrusive_ptr<ExpressionObject> pEO;
+ BSONObj _raw;
+};
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+class DocumentSourceRedact : public DocumentSource {
+public:
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual boost::intrusive_ptr<DocumentSource> optimize();
- virtual Value serialize(bool explain = false) const;
+ static const char redactName[];
- private:
- DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::intrusive_ptr<Expression>& previsit);
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
- // These both work over _variables
- boost::optional<Document> redactObject(); // redacts CURRENT
- Value redactValue(const Value& in);
+ virtual Value serialize(bool explain = false) const;
- Variables::Id _currentId;
- std::unique_ptr<Variables> _variables;
- boost::intrusive_ptr<Expression> _expression;
- };
+private:
+ DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& previsit);
- class DocumentSourceSort : public DocumentSource
- , public SplittableDocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual void serializeToArray(std::vector<Value>& array, bool explain = false) const;
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource> &pNextSource);
- virtual void dispose();
+ // These both work over _variables
+ boost::optional<Document> redactObject(); // redacts CURRENT
+ Value redactValue(const Value& in);
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
+ Variables::Id _currentId;
+ std::unique_ptr<Variables> _variables;
+ boost::intrusive_ptr<Expression> _expression;
+};
- virtual boost::intrusive_ptr<DocumentSource> getShardSource();
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource();
+class DocumentSourceSort : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual void serializeToArray(std::vector<Value>& array, bool explain = false) const;
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource);
+ virtual void dispose();
- /**
- Add sort key field.
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
- Adds a sort key field to the key being built up. A concatenated
- key is built up by calling this repeatedly.
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource();
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource();
- @param fieldPath the field path to the key component
- @param ascending if true, use the key for an ascending sort,
- otherwise, use it for descending
- */
- void addKey(const std::string &fieldPath, bool ascending);
+ /**
+ Add sort key field.
- /// Write out a Document whose contents are the sort key.
- Document serializeSortKey(bool explain) const;
+ Adds a sort key field to the key being built up. A concatenated
+ key is built up by calling this repeatedly.
- /**
- Create a sorting DocumentSource from BSON.
+ @param fieldPath the field path to the key component
+ @param ascending if true, use the key for an ascending sort,
+ otherwise, use it for descending
+ */
+ void addKey(const std::string& fieldPath, bool ascending);
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $group.
+ /// Write out a Document whose contents are the sort key.
+ Document serializeSortKey(bool explain) const;
- @param pBsonElement the BSONELement that defines the group
- @param pExpCtx the expression context for the pipeline
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ Create a sorting DocumentSource from BSON.
- /// Create a DocumentSourceSort with a given sort and (optional) limit
- static boost::intrusive_ptr<DocumentSourceSort> create(
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx,
- BSONObj sortOrder,
- long long limit=-1);
+ This is a convenience method that uses the above, and operates on
+ a BSONElement that has been deteremined to be an Object with an
+ element named $group.
- /// returns -1 for no limit
- long long getLimit() const;
+ @param pBsonElement the BSONELement that defines the group
+ @param pExpCtx the expression context for the pipeline
+ @returns the grouping DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const { return limitSrc; }
+ /// Create a DocumentSourceSort with a given sort and (optional) limit
+ static boost::intrusive_ptr<DocumentSourceSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ BSONObj sortOrder,
+ long long limit = -1);
- static const char sortName[];
+ /// returns -1 for no limit
+ long long getLimit() const;
- private:
- DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const {
+ return limitSrc;
+ }
- virtual Value serialize(bool explain = false) const {
- verify(false); // should call addToBsonArray instead
- }
+ static const char sortName[];
- /*
- Before returning anything, this source must fetch everything from
- the underlying source and group it. populate() is used to do that
- on the first call to any method on this source. The populated
- boolean indicates that this has been done.
- */
- void populate();
- bool populated;
-
- SortOptions makeSortOptions() const;
-
- // These are used to merge pre-sorted results from a DocumentSourceMergeCursors or a
- // DocumentSourceCommandShards depending on whether we have finished upgrading to 2.6 or
- // not.
- class IteratorFromCursor;
- class IteratorFromBsonArray;
- void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
- void populateFromBsonArrays(const std::vector<BSONArray>& arrays);
-
- /* these two parallel each other */
- typedef std::vector<boost::intrusive_ptr<Expression> > SortKey;
- SortKey vSortKey;
- std::vector<char> vAscending; // used like std::vector<bool> but without specialization
-
- /// Extracts the fields in vSortKey from the Document;
- Value extractKey(const Document& d) const;
-
- /// Compare two Values according to the specified sort key.
- int compare(const Value& lhs, const Value& rhs) const;
-
- typedef Sorter<Value, Document> MySorter;
-
- // For MySorter
- class Comparator {
- public:
- explicit Comparator(const DocumentSourceSort& source): _source(source) {}
- int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
- return _source.compare(lhs.first, rhs.first);
- }
- private:
- const DocumentSourceSort& _source;
- };
-
- boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
-
- bool _done;
- bool _mergingPresorted;
- std::unique_ptr<MySorter::Iterator> _output;
- };
+private:
+ DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- class DocumentSourceLimit : public DocumentSource
- , public SplittableDocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource> &pNextSource);
- virtual Value serialize(bool explain = false) const;
-
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
- return SEE_NEXT; // This doesn't affect needed fields
- }
+ virtual Value serialize(bool explain = false) const {
+ verify(false); // should call addToBsonArray instead
+ }
- /**
- Create a new limiting DocumentSource.
+ /*
+ Before returning anything, this source must fetch everything from
+ the underlying source and group it. populate() is used to do that
+ on the first call to any method on this source. The populated
+ boolean indicates that this has been done.
+ */
+ void populate();
+ bool populated;
- @param pExpCtx the expression context for the pipeline
- @returns the DocumentSource
- */
- static boost::intrusive_ptr<DocumentSourceLimit> create(
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx,
- long long limit);
+ SortOptions makeSortOptions() const;
- // Virtuals for SplittableDocumentSource
- // Need to run on rounter. Running on shard as well is an optimization.
- virtual boost::intrusive_ptr<DocumentSource> getShardSource() { return this; }
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource() { return this; }
+ // These are used to merge pre-sorted results from a DocumentSourceMergeCursors or a
+ // DocumentSourceCommandShards depending on whether we have finished upgrading to 2.6 or
+ // not.
+ class IteratorFromCursor;
+ class IteratorFromBsonArray;
+ void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
+ void populateFromBsonArrays(const std::vector<BSONArray>& arrays);
- long long getLimit() const { return limit; }
- void setLimit(long long newLimit) { limit = newLimit; }
+ /* these two parallel each other */
+ typedef std::vector<boost::intrusive_ptr<Expression>> SortKey;
+ SortKey vSortKey;
+ std::vector<char> vAscending; // used like std::vector<bool> but without specialization
- /**
- Create a limiting DocumentSource from BSON.
+ /// Extracts the fields in vSortKey from the Document;
+ Value extractKey(const Document& d) const;
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $limit.
+ /// Compare two Values according to the specified sort key.
+ int compare(const Value& lhs, const Value& rhs) const;
- @param pBsonElement the BSONELement that defines the limit
- @param pExpCtx the expression context
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ typedef Sorter<Value, Document> MySorter;
- static const char limitName[];
+ // For MySorter
+ class Comparator {
+ public:
+ explicit Comparator(const DocumentSourceSort& source) : _source(source) {}
+ int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
+ return _source.compare(lhs.first, rhs.first);
+ }
private:
- DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext> &pExpCtx,
- long long limit);
-
- long long limit;
- long long count;
+ const DocumentSourceSort& _source;
};
- class DocumentSourceSkip : public DocumentSource
- , public SplittableDocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource> &pNextSource);
- virtual Value serialize(bool explain = false) const;
- virtual boost::intrusive_ptr<DocumentSource> optimize();
-
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
- return SEE_NEXT; // This doesn't affect needed fields
- }
-
- /**
- Create a new skipping DocumentSource.
+ boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
- @param pExpCtx the expression context
- @returns the DocumentSource
- */
- static boost::intrusive_ptr<DocumentSourceSkip> create(
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ bool _done;
+ bool _mergingPresorted;
+ std::unique_ptr<MySorter::Iterator> _output;
+};
- // Virtuals for SplittableDocumentSource
- // Need to run on rounter. Can't run on shards.
- virtual boost::intrusive_ptr<DocumentSource> getShardSource() { return NULL; }
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource() { return this; }
+class DocumentSourceLimit : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource);
+ virtual Value serialize(bool explain = false) const;
- long long getSkip() const { return _skip; }
- void setSkip(long long newSkip) { _skip = newSkip; }
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
+ return SEE_NEXT; // This doesn't affect needed fields
+ }
- /**
- Create a skipping DocumentSource from BSON.
+ /**
+ Create a new limiting DocumentSource.
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $skip.
+ @param pExpCtx the expression context for the pipeline
+ @returns the DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSourceLimit> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
+
+ // Virtuals for SplittableDocumentSource
+ // Need to run on rounter. Running on shard as well is an optimization.
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource() {
+ return this;
+ }
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource() {
+ return this;
+ }
+
+ long long getLimit() const {
+ return limit;
+ }
+ void setLimit(long long newLimit) {
+ limit = newLimit;
+ }
- @param pBsonElement the BSONELement that defines the skip
- @param pExpCtx the expression context
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ /**
+ Create a limiting DocumentSource from BSON.
- static const char skipName[];
+ This is a convenience method that uses the above, and operates on
+ a BSONElement that has been deteremined to be an Object with an
+ element named $limit.
- private:
- DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ @param pBsonElement the BSONELement that defines the limit
+ @param pExpCtx the expression context
+ @returns the grouping DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- long long _skip;
- bool _needToSkip;
- };
+ static const char limitName[];
+private:
+ DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
- class DocumentSourceUnwind :
- public DocumentSource {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual Value serialize(bool explain = false) const;
+ long long limit;
+ long long count;
+};
- virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
+class DocumentSourceSkip : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource);
+ virtual Value serialize(bool explain = false) const;
+ virtual boost::intrusive_ptr<DocumentSource> optimize();
- /**
- Create a new projection DocumentSource from BSON.
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const {
+ return SEE_NEXT; // This doesn't affect needed fields
+ }
- This is a convenience for directly handling BSON, and relies on the
- above methods.
+ /**
+ Create a new skipping DocumentSource.
- @param pBsonElement the BSONElement with an object named $project
- @param pExpCtx the expression context for the pipeline
- @returns the created projection
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ @param pExpCtx the expression context
+ @returns the DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSourceSkip> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ // Virtuals for SplittableDocumentSource
+ // Need to run on rounter. Can't run on shards.
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource() {
+ return NULL;
+ }
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource() {
+ return this;
+ }
+
+ long long getSkip() const {
+ return _skip;
+ }
+ void setSkip(long long newSkip) {
+ _skip = newSkip;
+ }
- static const char unwindName[];
+ /**
+ Create a skipping DocumentSource from BSON.
- private:
- DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
+ This is a convenience method that uses the above, and operates on
+ a BSONElement that has been deteremined to be an Object with an
+ element named $skip.
- /** Specify the field to unwind. */
- void unwindPath(const FieldPath &fieldPath);
+ @param pBsonElement the BSONELement that defines the skip
+ @param pExpCtx the expression context
+ @returns the grouping DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- // Configuration state.
- std::unique_ptr<FieldPath> _unwindPath;
+ static const char skipName[];
- // Iteration state.
- class Unwinder;
- std::unique_ptr<Unwinder> _unwinder;
- };
+private:
+ DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- class DocumentSourceGeoNear : public DocumentSource
- , public SplittableDocumentSource
- , public DocumentSourceNeedsMongod {
- public:
- // virtuals from DocumentSource
- virtual boost::optional<Document> getNext();
- virtual const char *getSourceName() const;
- virtual void setSource(DocumentSource *pSource);
- virtual bool coalesce(const boost::intrusive_ptr<DocumentSource> &pNextSource);
- virtual bool isValidInitialSource() const { return true; }
- virtual Value serialize(bool explain = false) const;
+ long long _skip;
+ bool _needToSkip;
+};
- // Virtuals for SplittableDocumentSource
- virtual boost::intrusive_ptr<DocumentSource> getShardSource();
- virtual boost::intrusive_ptr<DocumentSource> getMergeSource();
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem,
- const boost::intrusive_ptr<ExpressionContext> &pCtx);
+class DocumentSourceUnwind : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual Value serialize(bool explain = false) const;
- static char geoNearName[];
+ virtual GetDepsReturn getDependencies(DepsTracker* deps) const;
- long long getLimit() { return limit; }
+ /**
+ Create a new projection DocumentSource from BSON.
- // this should only be used for testing
- static boost::intrusive_ptr<DocumentSourceGeoNear> create(
- const boost::intrusive_ptr<ExpressionContext> &pCtx);
+ This is a convenience for directly handling BSON, and relies on the
+ above methods.
- private:
- DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext> &pExpCtx);
-
- void parseOptions(BSONObj options);
- BSONObj buildGeoNearCmd() const;
- void runCommand();
-
- // These fields describe the command to run.
- // coords and distanceField are required, rest are optional
- BSONObj coords; // "near" option, but near is a reserved keyword on windows
- bool coordsIsArray;
- std::unique_ptr<FieldPath> distanceField; // Using unique_ptr because FieldPath can't be empty
- long long limit;
- double maxDistance;
- double minDistance;
- BSONObj query;
- bool spherical;
- double distanceMultiplier;
- std::unique_ptr<FieldPath> includeLocs;
-
- // these fields are used while processing the results
- BSONObj cmdOutput;
- std::unique_ptr<BSONObjIterator> resultsIterator; // iterator over cmdOutput["results"]
- };
+ @param pBsonElement the BSONElement with an object named $project
+ @param pExpCtx the expression context for the pipeline
+ @returns the created projection
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ static const char unwindName[];
+
+private:
+ DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /** Specify the field to unwind. */
+ void unwindPath(const FieldPath& fieldPath);
+
+ // Configuration state.
+ std::unique_ptr<FieldPath> _unwindPath;
+
+ // Iteration state.
+ class Unwinder;
+ std::unique_ptr<Unwinder> _unwinder;
+};
+
+class DocumentSourceGeoNear : public DocumentSource,
+ public SplittableDocumentSource,
+ public DocumentSourceNeedsMongod {
+public:
+ // virtuals from DocumentSource
+ virtual boost::optional<Document> getNext();
+ virtual const char* getSourceName() const;
+ virtual void setSource(DocumentSource* pSource);
+ virtual bool coalesce(const boost::intrusive_ptr<DocumentSource>& pNextSource);
+ virtual bool isValidInitialSource() const {
+ return true;
+ }
+ virtual Value serialize(bool explain = false) const;
+
+ // Virtuals for SplittableDocumentSource
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource();
+ virtual boost::intrusive_ptr<DocumentSource> getMergeSource();
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx);
+
+ static char geoNearName[];
+
+ long long getLimit() {
+ return limit;
+ }
+
+ // this should only be used for testing
+ static boost::intrusive_ptr<DocumentSourceGeoNear> create(
+ const boost::intrusive_ptr<ExpressionContext>& pCtx);
+
+private:
+ DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ void parseOptions(BSONObj options);
+ BSONObj buildGeoNearCmd() const;
+ void runCommand();
+
+ // These fields describe the command to run.
+ // coords and distanceField are required, rest are optional
+ BSONObj coords; // "near" option, but near is a reserved keyword on windows
+ bool coordsIsArray;
+ std::unique_ptr<FieldPath> distanceField; // Using unique_ptr because FieldPath can't be empty
+ long long limit;
+ double maxDistance;
+ double minDistance;
+ BSONObj query;
+ bool spherical;
+ double distanceMultiplier;
+ std::unique_ptr<FieldPath> includeLocs;
+
+ // these fields are used while processing the results
+ BSONObj cmdOutput;
+ std::unique_ptr<BSONObjIterator> resultsIterator; // iterator over cmdOutput["results"]
+};
}