diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-10-21 17:06:31 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-10-24 11:47:57 -0400 |
commit | 4fb763d868070414ca0af76b84ff8937b3773832 (patch) | |
tree | 54b3e49dc04a483759dd7042bda1d4e97cec581f /src/mongo/db/pipeline | |
parent | 7f4670f708361c6da29ba5503e000964a035fe6f (diff) | |
download | mongo-4fb763d868070414ca0af76b84ff8937b3773832.tar.gz |
SERVER-22632 Split up document_source.h into one header per stage.
Diffstat (limited to 'src/mongo/db/pipeline')
80 files changed, 2859 insertions, 1839 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index a78c307fad3..b5e5309d9e6 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -28,9 +28,12 @@ #include "mongo/platform/basic.h" -#include "mongo/db/matcher/expression_algo.h" #include "mongo/db/pipeline/document_source.h" + +#include "mongo/db/matcher/expression_algo.h" +#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/value.h" #include "mongo/util/string_map.h" diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index d5d3c02f07b..81ba3fc61a6 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -30,50 +30,32 @@ #include "mongo/platform/basic.h" +#include <boost/intrusive_ptr.hpp> #include <boost/optional.hpp> -#include <deque> #include <list> +#include <memory> #include <string> -#include <utility> #include <vector> #include "mongo/base/init.h" #include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/client/connpool.h" -#include "mongo/db/clientcursor.h" +#include "mongo/client/dbclientinterface.h" #include "mongo/db/collection_index_usage_tracker.h" #include "mongo/db/jsobj.h" -#include "mongo/db/matcher/matcher.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/pipeline/accumulation_statement.h" -#include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/pipeline/granularity_rounder.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/db/pipeline/lookup_set_cache.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/value.h" -#include "mongo/db/pipeline/value_comparator.h" -#include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/sorter/sorter.h" #include "mongo/stdx/functional.h" -#include "mongo/stdx/unordered_map.h" #include "mongo/util/intrusive_counter.h" namespace mongo { class AggregationRequest; class Document; -class Expression; -class ExpressionFieldPath; -class ExpressionObject; -class DocumentSourceLimit; -class DocumentSourceSort; -class PlanExecutor; -class RecordCursor; /** * Registers a DocumentSource to have the name 'key'. @@ -655,1775 +637,5 @@ protected: std::shared_ptr<MongodInterface> _mongod; }; -/** - * Constructs and returns Documents from the BSONObj objects produced by a supplied - * PlanExecutor. - * - * An object of this type may only be used by one thread, see SERVER-6123. - */ -class DocumentSourceCursor final : public DocumentSource { -public: - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - BSONObjSet getOutputSorts() final { - return _outputSorts; - } - /** - * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - Value serialize(bool explain = false) const final; - bool isValidInitialSource() const final { - return true; - } - void dispose() final; - - void detachFromOperationContext() final; - - void reattachToOperationContext(OperationContext* opCtx) final; - - /** - * Create a document source based on a passed-in PlanExecutor. - * - * This is usually put at the beginning of a chain of document sources - * in order to fetch data from the database. - */ - static boost::intrusive_ptr<DocumentSourceCursor> create( - const std::string& ns, - std::unique_ptr<PlanExecutor> exec, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /* - Record the query that was specified for the cursor this wraps, if - any. - - This should be captured after any optimizations are applied to - the pipeline so that it reflects what is really used. - - This gets used for explain output. - - @param pBsonObj the query to record - */ - void setQuery(const BSONObj& query) { - _query = query; - } - - /* - Record the sort that was specified for the cursor this wraps, if - any. - - This should be captured after any optimizations are applied to - the pipeline so that it reflects what is really used. - - This gets used for explain output. - - @param pBsonObj the sort to record - */ - void setSort(const BSONObj& sort) { - _sort = sort; - } - - /** - * Informs this object of projection and dependency information. - * - * @param projection The projection that has been passed down to the query system. - * @param deps The output of DepsTracker::toParsedDeps. - */ - void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps); - - /// returns -1 for no limit - long long getLimit() const; - - /** - * If subsequent sources need no information from the cursor, the cursor can simply output empty - * documents, avoiding the overhead of converting BSONObjs to Documents. - */ - void shouldProduceEmptyDocs() { - _shouldProduceEmptyDocs = true; - } - - const std::string& getPlanSummaryStr() const; - - const PlanSummaryStats& getPlanSummaryStats() const; - -protected: - void doInjectExpressionContext() final; - -private: - DocumentSourceCursor(const std::string& ns, - std::unique_ptr<PlanExecutor> exec, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - void loadBatch(); - - void recordPlanSummaryStr(); - - void recordPlanSummaryStats(); - - std::deque<Document> _currentBatch; - - // BSONObj members must outlive _projection and cursor. - BSONObj _query; - BSONObj _sort; - BSONObj _projection; - bool _shouldProduceEmptyDocs = false; - boost::optional<ParsedDeps> _dependencies; - boost::intrusive_ptr<DocumentSourceLimit> _limit; - long long _docsAddedToBatches; // for _limit enforcement - - const std::string _ns; - std::unique_ptr<PlanExecutor> _exec; - BSONObjSet _outputSorts; - std::string _planSummary; - PlanSummaryStats _planSummaryStats; -}; - - -class DocumentSourceGroup final : public DocumentSource, public SplittableDocumentSource { -public: - using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>; - using GroupsMap = ValueUnorderedMap<Accumulators>; - - static const size_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; - - // Virtuals from DocumentSource. - boost::intrusive_ptr<DocumentSource> optimize() final; - GetDepsReturn getDependencies(DepsTracker* deps) const final; - Value serialize(bool explain = false) const final; - GetNextResult getNext() final; - void dispose() final; - const char* getSourceName() const final; - BSONObjSet getOutputSorts() final; - - /** - * Convenience method for creating a new $group stage. - */ - static boost::intrusive_ptr<DocumentSourceGroup> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::intrusive_ptr<Expression>& groupByExpression, - std::vector<AccumulationStatement> accumulationStatements, - Variables::Id numVariables, - size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); - - /** - * Parses 'elem' into a $group stage, or throws a UserException if 'elem' was an invalid - * specification. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /** - * Add an accumulator, which will become a field in each Document that results from grouping. - */ - void addAccumulator(AccumulationStatement accumulationStatement); - - /** - * Sets the expression to use to determine the group id of each document. - */ - void setIdExpression(const boost::intrusive_ptr<Expression> idExpression); - - /** - * Tell this source if it is doing a merge from shards. Defaults to false. - */ - void setDoingMerge(bool doingMerge) { - _doingMerge = doingMerge; - } - - bool isStreaming() const { - return _streaming; - } - - // Virtuals for SplittableDocumentSource. - boost::intrusive_ptr<DocumentSource> getShardSource() final; - boost::intrusive_ptr<DocumentSource> getMergeSource() final; - -protected: - void doInjectExpressionContext() final; - -private: - explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); - - /** - * getNext() dispatches to one of these three depending on what type of $group it is. All three - * of these methods expect '_currentAccumulators' to have been reset before being called, and - * also expect initialize() to have been called already. - */ - GetNextResult getNextStreaming(); - GetNextResult getNextSpilled(); - GetNextResult getNextStandard(); - - /** - * Attempt to identify an input sort order that allows us to turn into a streaming $group. If we - * find one, return it. Otherwise, return boost::none. - */ - boost::optional<BSONObj> findRelevantInputSort() const; - - /** - * Before returning anything, this source must prepare itself. In a streaming $group, - * initialize() requests the first document from the previous source, and uses it to prepare the - * accumulators. In an unsorted $group, initialize() exhausts the previous source before - * returning. The '_initialized' boolean indicates that initialize() has finished. - * - * This method may not be able to finish initialization in a single call if 'pSource' returns a - * DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult - * encountered, which may be either kEOF or kPauseExecution. - */ - GetNextResult initialize(); - - /** - * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group - * does not exhaust the previous stage before returning, and thus does not maintain as large a - * store of documents at any one time, only an unsorted group can spill to disk. - */ - std::shared_ptr<Sorter<Value, Value>::Iterator> spill(); - - Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); - - /** - * Computes the internal representation of the group key. - */ - Value computeId(Variables* vars); - - /** - * Converts the internal representation of the group key to the _id shape specified by the - * user. - */ - Value expandId(const Value& val); - - /** - * 'vFieldName' contains the field names for the result documents, 'vpAccumulatorFactory' - * contains the accumulator factories for the result documents, and 'vpExpression' contains the - * common expressions used by each instance of each accumulator in order to find the right-hand - * side of what gets added to the accumulator. These three vectors parallel each other. - */ - std::vector<std::string> vFieldName; - std::vector<Accumulator::Factory> vpAccumulatorFactory; - std::vector<boost::intrusive_ptr<Expression>> vpExpression; - - bool _doingMerge; - size_t _memoryUsageBytes = 0; - size_t _maxMemoryUsageBytes; - std::unique_ptr<Variables> _variables; - std::vector<std::string> _idFieldNames; // used when id is a document - std::vector<boost::intrusive_ptr<Expression>> _idExpressions; - - BSONObj _inputSort; - bool _streaming; - bool _initialized; - - Value _currentId; - Accumulators _currentAccumulators; - - // We use boost::optional to defer initialization until the ExpressionContext containing the - // correct comparator is injected, since the groups must be built using the comparator's - // definition of equality. - boost::optional<GroupsMap> _groups; - - std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles; - bool _spilled; - - // Only used when '_spilled' is false. - GroupsMap::iterator groupsIterator; - - // Only used when '_spilled' is true. - std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; - const bool _extSortAllowed; - - std::pair<Value, Value> _firstPartOfNextGroup; - // Only used when '_sorted' is true. - boost::optional<Document> _firstDocOfNextGroup; -}; - -/** - * Provides a document source interface to retrieve index statistics for a given namespace. - * Each document returned represents a single index and mongod instance. - */ -class DocumentSourceIndexStats final : public DocumentSourceNeedsMongod { -public: - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(bool explain = false) const final; - - virtual bool isValidInitialSource() const final { - return true; - } - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - CollectionIndexUsageMap _indexStatsMap; - CollectionIndexUsageMap::const_iterator _indexStatsIter; - std::string _processName; -}; - -class DocumentSourceMatch final : public DocumentSource { -public: - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(bool explain = false) const final; - boost::intrusive_ptr<DocumentSource> optimize() final; - BSONObjSet getOutputSorts() final { - return pSource ? pSource->getOutputSorts() - : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - } - - /** - * Attempts to combine with any subsequent $match stages, joining the query objects with a - * $and. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - void setSource(DocumentSource* Source) final; - - GetDepsReturn getDependencies(DepsTracker* deps) const final; - - /** - * Convenience method for creating a $match stage. - */ - static boost::intrusive_ptr<DocumentSourceMatch> create( - BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx); - /** - * Parses a $match stage from 'elem'. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx); - - /** - * Access the MatchExpression stored inside the DocumentSourceMatch. Does not release ownership. - */ - MatchExpression* getMatchExpression() const { - return _expression.get(); - } - - /** - * Combines the filter in this $match with the filter of 'other' using a $and, updating this - * match in place. - */ - void joinMatchWith(boost::intrusive_ptr<DocumentSourceMatch> other); - - /** - * Returns the query in MatchExpression syntax. - */ - BSONObj getQuery() const; - - /** Returns the portion of the match that can safely be promoted to before a $redact. - * If this returns an empty BSONObj, no part of this match may safely be promoted. - * - * To be safe to promote, removing a field from a document to be matched must not cause - * that document to be accepted when it would otherwise be rejected. As an example, - * {name: {$ne: "bob smith"}} accepts documents without a name field, which means that - * running this filter before a redact that would remove the name field would leak - * information. On the other hand, {age: {$gt:5}} is ok because it doesn't accept documents - * that have had their age field removed. - */ - BSONObj redactSafePortion() const; - - static bool isTextQuery(const BSONObj& query); - bool isTextQuery() const { - return _isTextQuery; - } - - /** - * Attempt to split this $match into two stages, where the first is not dependent upon any path - * from 'fields', and where applying them in sequence is equivalent to applying this stage once. - * - * Will return two intrusive_ptrs to new $match stages, where the first pointer is independent - * of 'fields', and the second is dependent. Either pointer may be null, so be sure to check the - * return value. - * - * For example, {$match: {a: "foo", "b.c": 4}} split by "b" will return pointers to two stages: - * {$match: {a: "foo"}}, and {$match: {"b.c": 4}}. - */ - std::pair<boost::intrusive_ptr<DocumentSourceMatch>, boost::intrusive_ptr<DocumentSourceMatch>> - splitSourceBy(const std::set<std::string>& fields); - - /** - * Given a document 'input', extract 'fields' and produce a BSONObj with those values. - */ - static BSONObj getObjectForMatch(const Document& input, const std::set<std::string>& fields); - - /** - * Should be called _only_ on a MatchExpression that is a predicate on 'path', or subfields of - * 'path'. It is also invalid to call this method on a $match including a $elemMatch on 'path', - * for example: {$match: {'path': {$elemMatch: {'subfield': 3}}}} - * - * Returns a new DocumentSourceMatch that, if executed on the subdocument at 'path', is - * equivalent to 'expression'. - * - * For example, if the original expression is {$and: [{'a.b': {$gt: 0}}, {'a.d': {$eq: 3}}]}, - * the new $match will have the expression {$and: [{b: {$gt: 0}}, {d: {$eq: 3}}]} after - * descending on the path 'a'. - */ - static boost::intrusive_ptr<DocumentSourceMatch> descendMatchOnPath( - MatchExpression* matchExpr, - const std::string& path, - boost::intrusive_ptr<ExpressionContext> expCtx); - - void doInjectExpressionContext(); - -private: - DocumentSourceMatch(const BSONObj& query, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - void addDependencies(DepsTracker* deps) const; - - std::unique_ptr<MatchExpression> _expression; - - // Cache the dependencies so that we know what fields we need to serialize to BSON for matching. - DepsTracker _dependencies; - - BSONObj _predicate; - bool _isTextQuery; -}; - -class DocumentSourceMergeCursors : public DocumentSource { -public: - struct CursorDescriptor { - CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId) - : connectionString(std::move(connectionString)), - ns(std::move(ns)), - cursorId(cursorId) {} - - ConnectionString connectionString; - std::string ns; - CursorId cursorId; - }; - - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - void dispose() final; - Value serialize(bool explain = false) const final; - bool isValidInitialSource() const final { - return true; - } - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - static boost::intrusive_ptr<DocumentSource> create( - std::vector<CursorDescriptor> cursorDescriptors, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /** Returns non-owning pointers to cursors managed by this stage. - * Call this instead of getNext() if you want access to the raw streams. - * This method should only be called at most once. - */ - std::vector<DBClientCursor*> getCursors(); - - /** - * Returns the next object from the cursor, throwing an appropriate exception if the cursor - * reported an error. This is a better form of DBClientCursor::nextSafe. - */ - static Document nextSafeFrom(DBClientCursor* cursor); - -private: - struct CursorAndConnection { - CursorAndConnection(const CursorDescriptor& cursorDescriptor); - ScopedDbConnection connection; - DBClientCursor cursor; - }; - - // using list to enable removing arbitrary elements - typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors; - - DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - // Converts _cursorDescriptors into active _cursors. - void start(); - - // This is the description of cursors to merge. - const std::vector<CursorDescriptor> _cursorDescriptors; - - // These are the actual cursors we are merging. Created lazily. - Cursors _cursors; - Cursors::iterator _currentCursor; - - bool _unstarted; -}; - -/** - * Used in testing to store documents without using the storage layer. Methods are not marked as - * final in order to allow tests to intercept calls if needed. - */ -class DocumentSourceMock : public DocumentSource { -public: - DocumentSourceMock(std::deque<GetNextResult> results); - DocumentSourceMock(std::deque<GetNextResult> results, - const boost::intrusive_ptr<ExpressionContext>& expCtx); - - GetNextResult getNext() override; - const char* getSourceName() const override; - Value serialize(bool explain = false) const override; - void dispose() override; - bool isValidInitialSource() const override { - return true; - } - BSONObjSet getOutputSorts() override { - return sorts; - } - - static boost::intrusive_ptr<DocumentSourceMock> create(); - - static boost::intrusive_ptr<DocumentSourceMock> create(Document doc); - - static boost::intrusive_ptr<DocumentSourceMock> create(const GetNextResult& result); - static boost::intrusive_ptr<DocumentSourceMock> create(std::deque<GetNextResult> results); - - static boost::intrusive_ptr<DocumentSourceMock> create(const char* json); - static boost::intrusive_ptr<DocumentSourceMock> create( - const std::initializer_list<const char*>& jsons); - - void reattachToOperationContext(OperationContext* opCtx) { - isDetachedFromOpCtx = false; - } - - void detachFromOperationContext() { - isDetachedFromOpCtx = true; - } - - boost::intrusive_ptr<DocumentSource> optimize() override { - isOptimized = true; - return this; - } - - void doInjectExpressionContext() override { - isExpCtxInjected = true; - } - - // Return documents from front of queue. - std::deque<GetNextResult> queue; - - bool isDisposed = false; - bool isDetachedFromOpCtx = false; - bool isOptimized = false; - bool isExpCtxInjected = false; - - BSONObjSet sorts; -}; - -/** - * This class is for DocumentSources that take in and return one document at a time, in a 1:1 - * transformation. It should only be used via an alias that passes the transformation logic through - * a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be - * created from BSON. - */ -class DocumentSourceSingleDocumentTransformation final : public DocumentSource { -public: - /** - * This class defines the minimal interface that every parser wishing to take advantage of - * DocumentSourceSingleDocumentTransformation must implement. - * - * This interface ensures that DocumentSourceSingleDocumentTransformations are passed parsed - * objects that can execute the transformation and provide additional features like - * serialization and reporting and returning dependencies. The parser must also provide - * implementations for optimizing and adding the expression context, even if those functions do - * nothing. - */ - class TransformerInterface { - public: - virtual ~TransformerInterface() = default; - virtual Document applyTransformation(Document input) = 0; - virtual void optimize() = 0; - virtual Document serialize(bool explain) const = 0; - virtual DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const = 0; - virtual void injectExpressionContext( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx) = 0; - virtual GetModPathsReturn getModifiedPaths() const = 0; - }; - - DocumentSourceSingleDocumentTransformation( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - std::unique_ptr<TransformerInterface> parsedTransform, - std::string name); - - // virtuals from DocumentSource - const char* getSourceName() const final; - GetNextResult getNext() final; - boost::intrusive_ptr<DocumentSource> optimize() final; - void dispose() final; - Value serialize(bool explain) const final; - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - void doInjectExpressionContext() final; - DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; - GetModPathsReturn getModifiedPaths() const final; - - bool canSwapWithMatch() const final { - return true; - } - -private: - // Stores transformation logic. - std::unique_ptr<TransformerInterface> _parsedTransform; - - // Specific name of the transformation. - std::string _name; -}; - -class DocumentSourceOut final : public DocumentSourceNeedsMongod, public SplittableDocumentSource { -public: - static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse( - const AggregationRequest& request, const BSONElement& spec); - - // virtuals from DocumentSource - ~DocumentSourceOut() final; - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(bool explain = false) const final; - GetDepsReturn getDependencies(DepsTracker* deps) const final; - bool needsPrimaryShard() const final { - return true; - } - - // Virtuals for SplittableDocumentSource - boost::intrusive_ptr<DocumentSource> getShardSource() final { - return NULL; - } - boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; - } - - const NamespaceString& getOutputNs() const { - return _outputNs; - } - - /** - Create a document source for output and pass-through. - - This can be put anywhere in a pipeline and will store content as - well as pass it on. - - @param pBsonElement the raw BSON specification for the source - @param pExpCtx the expression context for the pipeline - @returns the newly created document source - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceOut(const NamespaceString& outputNs, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /** - * Sets '_tempNs' to a unique temporary namespace, makes sure the output collection isn't - * sharded or capped, and saves the collection options and indexes of the target collection. - * Then creates the temporary collection we will insert into by copying the collection options - * and indexes from the target collection. - * - * Sets '_initialized' to true upon completion. - */ - void initialize(); - - /** - * Inserts all of 'toInsert' into the temporary collection. - */ - void spill(const std::vector<BSONObj>& toInsert); - - bool _initialized = false; - bool _done = false; - - // Holds on to the original collection options and index specs so we can check they didn't - // change during computation. - BSONObj _originalOutOptions; - std::list<BSONObj> _originalIndexes; - - NamespaceString _tempNs; // output goes here as it is being processed. - const NamespaceString _outputNs; // output will go here after all data is processed. -}; - -class DocumentSourceRedact final : public DocumentSource { -public: - GetNextResult getNext() final; - const char* getSourceName() const final; - boost::intrusive_ptr<DocumentSource> optimize() final; - - /** - * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact - * stage. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - - void doInjectExpressionContext() final; - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - - Value serialize(bool explain = false) const final; - -private: - DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::intrusive_ptr<Expression>& previsit); - - // These both work over _variables - boost::optional<Document> redactObject(); // redacts CURRENT - Value redactValue(const Value& in); - - Variables::Id _currentId; - std::unique_ptr<Variables> _variables; - boost::intrusive_ptr<Expression> _expression; -}; - -/* - * $replaceRoot takes an object containing only an expression in the newRoot field, and replaces - * each incoming document with the result of evaluating that expression. Throws an error if the - * given expression is not an object or if the expression evaluates to the "missing" Value. This - * is implemented as an extension of DocumentSourceSingleDocumentTransformation. - */ -class DocumentSourceReplaceRoot final { -public: - /** - * Creates a new replaceRoot DocumentSource from the BSON specification of the $replaceRoot - * stage. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceReplaceRoot() = default; -}; - -class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource { -public: - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(bool explain = false) const final; - - GetDepsReturn getDependencies(DepsTracker* deps) const final { - return SEE_NEXT; - } - - boost::intrusive_ptr<DocumentSource> getShardSource() final; - boost::intrusive_ptr<DocumentSource> getMergeSource() final; - - long long getSampleSize() const { - return _size; - } - - void doInjectExpressionContext() final; - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - -private: - explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - long long _size; - - // Uses a $sort stage to randomly sort the documents. - boost::intrusive_ptr<DocumentSourceSort> _sortStage; -}; - -/** - * This class is not a registered stage, it is only used as an optimized replacement for $sample - * when the storage engine allows us to use a random cursor. - */ -class DocumentSourceSampleFromRandomCursor final : public DocumentSource { -public: - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(bool explain = false) const final; - GetDepsReturn getDependencies(DepsTracker* deps) const final; - - void doInjectExpressionContext() final; - - static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - long long size, - std::string idField, - long long collectionSize); - -private: - DocumentSourceSampleFromRandomCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx, - long long size, - std::string idField, - long long collectionSize); - - /** - * Keep asking for documents from the random cursor until it yields a new document. Errors if a - * a document is encountered without a value for '_idField', or if the random cursor keeps - * returning duplicate elements. - */ - GetNextResult getNextNonDuplicateDocument(); - - long long _size; - - // The field to use as the id of a document. Usually '_id', but 'ts' for the oplog. - std::string _idField; - - // Keeps track of the documents that have been returned, since a random cursor is allowed to - // return duplicates. We use boost::optional to defer initialization until the ExpressionContext - // containing the correct comparator is injected. - boost::optional<ValueUnorderedSet> _seenDocs; - - // The approximate number of documents in the collection (includes orphans). - const long long _nDocsInColl; - - // The value to be assigned to the randMetaField of outcoming documents. Each call to getNext() - // will decrement this value by an amount scaled by _nDocsInColl as an attempt to appear as if - // the documents were produced by a top-k random sort. - double _randMetaFieldVal = 1.0; -}; - -class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource { -public: - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - BSONObjSet getOutputSorts() final { - return pSource ? pSource->getOutputSorts() - : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - } - - /** - * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - Value serialize(bool explain = false) const final; - - GetDepsReturn getDependencies(DepsTracker* deps) const final { - return SEE_NEXT; // This doesn't affect needed fields - } - - /** - Create a new limiting DocumentSource. - - @param pExpCtx the expression context for the pipeline - @returns the DocumentSource - */ - static boost::intrusive_ptr<DocumentSourceLimit> create( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); - - // Virtuals for SplittableDocumentSource - // Need to run on rounter. Running on shard as well is an optimization. - boost::intrusive_ptr<DocumentSource> getShardSource() final { - return this; - } - boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; - } - - long long getLimit() const { - return _limit; - } - void setLimit(long long newLimit) { - _limit = newLimit; - } - - /** - Create a limiting DocumentSource from BSON. - - This is a convenience method that uses the above, and operates on - a BSONElement that has been deteremined to be an Object with an - element named $limit. - - @param pBsonElement the BSONELement that defines the limit - @param pExpCtx the expression context - @returns the grouping DocumentSource - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); - - long long _limit; - long long _nReturned = 0; -}; - -class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource { -public: - static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; - - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - void serializeToArray(std::vector<Value>& array, bool explain = false) const final; - - GetModPathsReturn getModifiedPaths() const final { - // A $sort does not modify any paths. - return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}}; - } - - bool canSwapWithMatch() const final { - return true; - } - - BSONObjSet getOutputSorts() final { - return allPrefixes(_sort); - } - - /** - * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - void dispose() final; - - GetDepsReturn getDependencies(DepsTracker* deps) const final; - - boost::intrusive_ptr<DocumentSource> getShardSource() final; - boost::intrusive_ptr<DocumentSource> getMergeSource() final; - - /// Write out a Document whose contents are the sort key. - Document serializeSortKey(bool explain) const; - - /** - * Parses a $sort stage from the user-supplied BSON. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /** - * Convenience method for creating a $sort stage. - */ - static boost::intrusive_ptr<DocumentSourceSort> create( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - BSONObj sortOrder, - long long limit = -1, - uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); - - /** - * Returns -1 for no limit. - */ - long long getLimit() const; - - /** - * Loads a document to be sorted. This can be used to sort a stream of documents that are not - * coming from another DocumentSource. Once all documents have been added, the caller must call - * loadingDone() before using getNext() to receive the documents in sorted order. - */ - void loadDocument(const Document& doc); - - /** - * Signals to the sort stage that there will be no more input documents. It is an error to call - * loadDocument() once this method returns. - */ - void loadingDone(); - - /** - * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that - * have already been sorted. - */ - void populateFromCursors(const std::vector<DBClientCursor*>& cursors); - - bool isPopulated() { - return _populated; - }; - - boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const { - return limitSrc; - } - -private: - explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - Value serialize(bool explain = false) const final { - MONGO_UNREACHABLE; // Should call serializeToArray instead. - } - - /** - * Helper to add a sort key to this stage. - */ - void addKey(StringData fieldPath, bool ascending); - - /** - * Before returning anything, we have to consume all input and sort it. This method consumes all - * input and prepares the sorted stream '_output'. - * - * This method may not be able to finish populating the sorter in a single call if 'pSource' - * returns a DocumentSource::GetNextResult::kPauseExecution, so it returns the last - * GetNextResult encountered, which may be either kEOF or kPauseExecution. - */ - GetNextResult populate(); - bool _populated = false; - - BSONObj _sort; - - SortOptions makeSortOptions() const; - - // This is used to merge pre-sorted results from a DocumentSourceMergeCursors. - class IteratorFromCursor; - - /* these two parallel each other */ - typedef std::vector<boost::intrusive_ptr<Expression>> SortKey; - SortKey vSortKey; - std::vector<char> vAscending; // used like std::vector<bool> but without specialization - - /// Extracts the fields in vSortKey from the Document; - Value extractKey(const Document& d) const; - - /// Compare two Values according to the specified sort key. - int compare(const Value& lhs, const Value& rhs) const; - - typedef Sorter<Value, Document> MySorter; - - /** - * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep - * the smallest limit. - */ - void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) { - if (!limitSrc || limit->getLimit() < limitSrc->getLimit()) { - limitSrc = limit; - } - } - - // For MySorter - class Comparator { - public: - explicit Comparator(const DocumentSourceSort& source) : _source(source) {} - int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const { - return _source.compare(lhs.first, rhs.first); - } - - private: - const DocumentSourceSort& _source; - }; - - boost::intrusive_ptr<DocumentSourceLimit> limitSrc; - - uint64_t _maxMemoryUsageBytes; - bool _done; - bool _mergingPresorted; - std::unique_ptr<MySorter> _sorter; - std::unique_ptr<MySorter::Iterator> _output; -}; - -class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource { -public: - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - /** - * Attempts to move a subsequent $limit before the skip, potentially allowing for forther - * optimizations earlier in the pipeline. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - Value serialize(bool explain = false) const final; - boost::intrusive_ptr<DocumentSource> optimize() final; - BSONObjSet getOutputSorts() final { - return pSource ? pSource->getOutputSorts() - : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); - } - - GetDepsReturn getDependencies(DepsTracker* deps) const final { - return SEE_NEXT; // This doesn't affect needed fields - } - - // Virtuals for SplittableDocumentSource - // Need to run on rounter. Can't run on shards. - boost::intrusive_ptr<DocumentSource> getShardSource() final { - return NULL; - } - boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; - } - - long long getSkip() const { - return _nToSkip; - } - void setSkip(long long newSkip) { - _nToSkip = newSkip; - } - - /** - * Convenience method for creating a $skip stage. - */ - static boost::intrusive_ptr<DocumentSourceSkip> create( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip); - - /** - * Parses the user-supplied BSON into a $skip stage. - * - * Throws a UserException if 'elem' is an invalid $skip specification. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - long long nToSkip); - - long long _nToSkip = 0; - long long _nSkippedSoFar = 0; -}; - - -class DocumentSourceUnwind final : public DocumentSource { -public: - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(bool explain = false) const final; - BSONObjSet getOutputSorts() final; - - /** - * Returns the unwound path, and the 'includeArrayIndex' path, if specified. - */ - GetModPathsReturn getModifiedPaths() const final; - - bool canSwapWithMatch() const final { - return true; - } - - GetDepsReturn getDependencies(DepsTracker* deps) const final; - - /** - * Creates a new $unwind DocumentSource from a BSON specification. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - static boost::intrusive_ptr<DocumentSourceUnwind> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const std::string& path, - bool includeNullIfEmptyOrMissing, - const boost::optional<std::string>& includeArrayIndex); - - std::string getUnwindPath() const { - return _unwindPath.fullPath(); - } - - bool preserveNullAndEmptyArrays() const { - return _preserveNullAndEmptyArrays; - } - - const boost::optional<FieldPath>& indexPath() const { - return _indexPath; - } - -private: - DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - const FieldPath& fieldPath, - bool includeNullIfEmptyOrMissing, - const boost::optional<FieldPath>& includeArrayIndex); - - // Configuration state. - const FieldPath _unwindPath; - // Documents that have a nullish value, or an empty array for the field '_unwindPath', will pass - // through the $unwind stage unmodified if '_preserveNullAndEmptyArrays' is true. - const bool _preserveNullAndEmptyArrays; - // If set, the $unwind stage will include the array index in the specified path, overwriting any - // existing value, setting to null when the value was a non-array or empty array. - const boost::optional<FieldPath> _indexPath; - - // Iteration state. - class Unwinder; - std::unique_ptr<Unwinder> _unwinder; -}; - -class DocumentSourceGeoNear : public DocumentSourceNeedsMongod, public SplittableDocumentSource { -public: - static const long long kDefaultLimit; - - // virtuals from DocumentSource - GetNextResult getNext() final; - const char* getSourceName() const final; - /** - * Attempts to combine with a subsequent limit stage, setting the internal limit field - * as a result. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - bool isValidInitialSource() const final { - return true; - } - Value serialize(bool explain = false) const final; - BSONObjSet getOutputSorts() final { - return SimpleBSONObjComparator::kInstance.makeBSONObjSet( - {BSON(distanceField->fullPath() << -1)}); - } - - // Virtuals for SplittableDocumentSource - boost::intrusive_ptr<DocumentSource> getShardSource() final; - boost::intrusive_ptr<DocumentSource> getMergeSource() final; - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx); - - static char geoNearName[]; - - long long getLimit() { - return limit; - } - - BSONObj getQuery() const { - return query; - }; - - // this should only be used for testing - static boost::intrusive_ptr<DocumentSourceGeoNear> create( - const boost::intrusive_ptr<ExpressionContext>& pCtx); - -private: - explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - void parseOptions(BSONObj options); - BSONObj buildGeoNearCmd() const; - void runCommand(); - - // These fields describe the command to run. - // coords and distanceField are required, rest are optional - BSONObj coords; // "near" option, but near is a reserved keyword on windows - bool coordsIsArray; - std::unique_ptr<FieldPath> distanceField; // Using unique_ptr because FieldPath can't be empty - long long limit; - double maxDistance; - double minDistance; - BSONObj query; - bool spherical; - double distanceMultiplier; - std::unique_ptr<FieldPath> includeLocs; - - // these fields are used while processing the results - BSONObj cmdOutput; - std::unique_ptr<BSONObjIterator> resultsIterator; // iterator over cmdOutput["results"] -}; - -/** - * Queries separate collection for equality matches with documents in the pipeline collection. - * Adds matching documents to a new array field in the input document. - */ -class DocumentSourceLookUp final : public DocumentSourceNeedsMongod, - public SplittableDocumentSource { -public: - static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse( - const AggregationRequest& request, const BSONElement& spec); - - GetNextResult getNext() final; - const char* getSourceName() const final; - void serializeToArray(std::vector<Value>& array, bool explain = false) const final; - - /** - * Returns the 'as' path, and possibly fields modified by an absorbed $unwind. - */ - GetModPathsReturn getModifiedPaths() const final; - - bool canSwapWithMatch() const final { - return true; - } - - /** - * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc' - * field. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - GetDepsReturn getDependencies(DepsTracker* deps) const final; - void dispose() final; - - BSONObjSet getOutputSorts() final { - return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); - } - - bool needsPrimaryShard() const final { - return true; - } - - boost::intrusive_ptr<DocumentSource> getShardSource() final { - return nullptr; - } - - boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; - } - - void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { - collections->push_back(_fromNs); - } - - void doDetachFromOperationContext() final; - - void doReattachToOperationContext(OperationContext* opCtx) final; - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - /** - * Builds the BSONObj used to query the foreign collection and wraps it in a $match. - */ - static BSONObj makeMatchStageFromInput(const Document& input, - const FieldPath& localFieldName, - const std::string& foreignFieldName, - const BSONObj& additionalFilter); - - /** - * Helper to absorb an $unwind stage. Only used for testing this special behavior. - */ - void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) { - invariant(!_handlingUnwind); - _unwindSrc = unwind; - _handlingUnwind = true; - } - -protected: - void doInjectExpressionContext() final; - -private: - DocumentSourceLookUp(NamespaceString fromNs, - std::string as, - std::string localField, - std::string foreignField, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - - Value serialize(bool explain = false) const final { - // Should not be called; use serializeToArray instead. - MONGO_UNREACHABLE; - } - - GetNextResult unwindResult(); - - NamespaceString _fromNs; - FieldPath _as; - FieldPath _localField; - FieldPath _foreignField; - std::string _foreignFieldFieldName; - boost::optional<BSONObj> _additionalFilter; - - // The ExpressionContext used when performing aggregation pipelines against the '_fromNs' - // namespace. - boost::intrusive_ptr<ExpressionContext> _fromExpCtx; - - // The aggregation pipeline to perform against the '_fromNs' namespace. - std::vector<BSONObj> _fromPipeline; - - boost::intrusive_ptr<DocumentSourceMatch> _matchSrc; - boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc; - - bool _handlingUnwind = false; - bool _handlingMatch = false; - - // The following members are used to hold onto state across getNext() calls when - // '_handlingUnwind' is true. - long long _cursorIndex = 0; - boost::intrusive_ptr<Pipeline> _pipeline; - boost::optional<Document> _input; - boost::optional<Document> _nextValue; -}; - -class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongod { -public: - static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse( - const AggregationRequest& request, const BSONElement& spec); - - GetNextResult getNext() final; - const char* getSourceName() const final; - void dispose() final; - BSONObjSet getOutputSorts() final; - void serializeToArray(std::vector<Value>& array, bool explain = false) const final; - - /** - * Returns the 'as' path, and possibly the fields modified by an absorbed $unwind. - */ - GetModPathsReturn getModifiedPaths() const final; - - bool canSwapWithMatch() const final { - return true; - } - - /** - * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field. - */ - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, - Pipeline::SourceContainer* container) final; - - GetDepsReturn getDependencies(DepsTracker* deps) const final { - _startWith->addDependencies(deps); - return SEE_NEXT; - }; - - bool needsPrimaryShard() const final { - return true; - } - - void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { - collections->push_back(_from); - } - - void doDetachFromOperationContext() final; - - void doReattachToOperationContext(OperationContext* opCtx) final; - - static boost::intrusive_ptr<DocumentSourceGraphLookUp> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - NamespaceString fromNs, - std::string asField, - std::string connectFromField, - std::string connectToField, - boost::intrusive_ptr<Expression> startWith, - boost::optional<BSONObj> additionalFilter, - boost::optional<FieldPath> depthField, - boost::optional<long long> maxDepth, - boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc); - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -protected: - void doInjectExpressionContext() final; - -private: - DocumentSourceGraphLookUp( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - NamespaceString from, - std::string as, - std::string connectFromField, - std::string connectToField, - boost::intrusive_ptr<Expression> startWith, - boost::optional<BSONObj> additionalFilter, - boost::optional<FieldPath> depthField, - boost::optional<long long> maxDepth, - boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc); - - Value serialize(bool explain = false) const final { - // Should not be called; use serializeToArray instead. - MONGO_UNREACHABLE; - } - - /** - * Prepares the query to execute on the 'from' collection wrapped in a $match by using the - * contents of '_frontier'. - * - * Fills 'cached' with any values that were retrieved from the cache. - * - * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache. - * Otherwise, returns a query object. - */ - boost::optional<BSONObj> makeMatchStageFromFrontier(BSONObjSet* cached); - - /** - * If we have internalized a $unwind, getNext() dispatches to this function. - */ - GetNextResult getNextUnwound(); - - /** - * Perform a breadth-first search of the 'from' collection. '_frontier' should already be - * populated with the values for the initial query. Populates '_discovered' with the result(s) - * of the query. - */ - void doBreadthFirstSearch(); - - /** - * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a - * breadth-first search. Caller should check that _input is not boost::none. - */ - void performSearch(); - - /** - * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying - * for 'queried'. - */ - void addToCache(const BSONObj& result, const ValueUnorderedSet& queried); - - /** - * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then - * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'. - */ - void checkMemoryUsage(); - - /** - * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier' - * with the object's 'connectTo' values. - * - * Returns whether '_visited' was updated, and thus, whether the search should recurse. - */ - bool addToVisitedAndFrontier(BSONObj result, long long depth); - - // $graphLookup options. - NamespaceString _from; - FieldPath _as; - FieldPath _connectFromField; - FieldPath _connectToField; - boost::intrusive_ptr<Expression> _startWith; - boost::optional<BSONObj> _additionalFilter; - boost::optional<FieldPath> _depthField; - boost::optional<long long> _maxDepth; - - // The ExpressionContext used when performing aggregation pipelines against the '_from' - // namespace. - boost::intrusive_ptr<ExpressionContext> _fromExpCtx; - - // The aggregation pipeline to perform against the '_from' namespace. - std::vector<BSONObj> _fromPipeline; - - size_t _maxMemoryUsageBytes = 100 * 1024 * 1024; - - // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'. - size_t _visitedUsageBytes = 0; - size_t _frontierUsageBytes = 0; - - // Only used during the breadth-first search, tracks the set of values on the current frontier. - // We use boost::optional to defer initialization until the ExpressionContext containing the - // correct comparator is injected. - boost::optional<ValueUnorderedSet> _frontier; - - // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the - // document from the foreign collection, value is the document itself. The keys are compared - // using the simple collation. - ValueUnorderedMap<BSONObj> _visited; - - // Caches query results to avoid repeating any work. This structure is maintained across calls - // to getNext(). - LookupSetCache _cache; - - // When we have internalized a $unwind, we must keep track of the input document, since we will - // need it for multiple "getNext()" calls. - boost::optional<Document> _input; - - // The variables that are in scope to be used by the '_startWith' expression. - std::unique_ptr<Variables> _variables; - - // Keep track of a $unwind that was absorbed into this stage. - boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind; - - // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that - // field, tracking how many results we've returned so far for the current input document. - long long _outputIndex; -}; - -class DocumentSourceSortByCount final { -public: - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceSortByCount() = default; -}; - -class DocumentSourceCount final { -public: - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceCount() = default; -}; - -class DocumentSourceBucket final { -public: - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceBucket() = default; -}; - -/** - * The $project stage can be used for simple transformations such as including or excluding a set - * of fields, or can do more sophisticated things, like include some fields and add new "computed" - * fields, using the expression language. Note you can not mix an exclusion-style projection with - * adding or including any other fields. - */ -class DocumentSourceProject final { -public: - /** - * Convenience method to create a $project stage from 'projectSpec'. - */ - static boost::intrusive_ptr<DocumentSource> create( - BSONObj projectSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx); - - /** - * Parses a $project stage from the user-supplied BSON. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceProject() = default; -}; - -/** - * $addFields adds or replaces the specified fields to/in the document while preserving the original - * document. It is modeled on and throws the same errors as $project. - */ -class DocumentSourceAddFields final { -public: - /** - * Convenience method for creating a $addFields stage from 'addFieldsSpec'. - */ - static boost::intrusive_ptr<DocumentSource> create( - BSONObj addFieldsSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx); - - /** - * Parses a $addFields stage from the user-supplied BSON. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - -private: - DocumentSourceAddFields() = default; -}; - -/** - * Provides a document source interface to retrieve collection-level statistics for a given - * collection. - */ -class DocumentSourceCollStats : public DocumentSourceNeedsMongod { -public: - class LiteParsed final : public LiteParsedDocumentSource { - public: - static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, - const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(); - } - - bool isCollStats() const final { - return true; - } - - stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { - return stdx::unordered_set<NamespaceString>(); - } - }; - - DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSourceNeedsMongod(pExpCtx) {} - - GetNextResult getNext() final; - - const char* getSourceName() const final; - - bool isValidInitialSource() const final; - - Value serialize(bool explain = false) const; - - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - // The raw object given to $collStats containing user specified options. - BSONObj _collStatsSpec; - bool _finished = false; -}; - -/** - * The $bucketAuto stage takes a user-specified number of buckets and automatically determines - * boundaries such that the values are approximately equally distributed between those buckets. - */ -class DocumentSourceBucketAuto final : public DocumentSource, public SplittableDocumentSource { -public: - Value serialize(bool explain = false) const final; - GetDepsReturn getDependencies(DepsTracker* deps) const final; - GetNextResult getNext() final; - void dispose() final; - const char* getSourceName() const final; - - /** - * The $bucketAuto stage must be run on the merging shard. - */ - boost::intrusive_ptr<DocumentSource> getShardSource() final { - return nullptr; - } - boost::intrusive_ptr<DocumentSource> getMergeSource() final { - return this; - } - - static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; - - /** - * Convenience method to create a $bucketAuto stage. - * - * If 'accumulationStatements' is the empty vector, it will be filled in with the statement - * 'count: {$sum: 1}'. - */ - static boost::intrusive_ptr<DocumentSourceBucketAuto> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::intrusive_ptr<Expression>& groupByExpression, - Variables::Id numVariables, - int numBuckets, - std::vector<AccumulationStatement> accumulationStatements = {}, - const boost::intrusive_ptr<GranularityRounder>& granularityRounder = nullptr, - uint64_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); - - /** - * Parses a $bucketAuto stage from the user-supplied BSON. - */ - static boost::intrusive_ptr<DocumentSource> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - -private: - DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, - const boost::intrusive_ptr<Expression>& groupByExpression, - Variables::Id numVariables, - int numBuckets, - std::vector<AccumulationStatement> accumulationStatements, - const boost::intrusive_ptr<GranularityRounder>& granularityRounder, - uint64_t maxMemoryUsageBytes); - - // struct for holding information about a bucket. - struct Bucket { - Bucket(Value min, Value max, std::vector<Accumulator::Factory> accumulatorFactories); - Value _min; - Value _max; - std::vector<boost::intrusive_ptr<Accumulator>> _accums; - }; - - /** - * Consumes all of the documents from the source in the pipeline and sorts them by their - * 'groupBy' value. This method might not be able to finish populating the sorter in a single - * call if 'pSource' returns a DocumentSource::GetNextResult::kPauseExecution, so this returns - * the last GetNextResult encountered, which may be either kEOF or kPauseExecution. - */ - GetNextResult populateSorter(); - - /** - * Computes the 'groupBy' expression value for 'doc'. - */ - Value extractKey(const Document& doc); - - /** - * Calculates the bucket boundaries for the input documents and places them into buckets. - */ - void populateBuckets(); - - /** - * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'. - */ - void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket); - - /** - * Adds 'newBucket' to _buckets and updates any boundaries if necessary. - */ - void addBucket(Bucket& newBucket); - - /** - * Makes a document using the information from bucket. This is what is returned when getNext() - * is called. - */ - Document makeDocument(const Bucket& bucket); - - std::unique_ptr<Sorter<Value, Document>> _sorter; - std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput; - - // _fieldNames contains the field names for the result documents, _accumulatorFactories contains - // the accumulator factories for the result documents, and _expressions contains the common - // expressions used by each instance of each accumulator in order to find the right-hand side of - // what gets added to the accumulator. These three vectors parallel each other. - std::vector<std::string> _fieldNames; - std::vector<Accumulator::Factory> _accumulatorFactories; - std::vector<boost::intrusive_ptr<Expression>> _expressions; - - int _nBuckets; - uint64_t _maxMemoryUsageBytes; - bool _populated = false; - std::vector<Bucket> _buckets; - std::vector<Bucket>::iterator _bucketsIterator; - std::unique_ptr<Variables> _variables; - boost::intrusive_ptr<Expression> _groupByExpression; - boost::intrusive_ptr<GranularityRounder> _granularityRounder; - long long _nDocuments = 0; -}; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp index 7f9c2383b58..0f7366538f9 100644 --- a/src/mongo/db/pipeline/document_source_add_fields.cpp +++ b/src/mongo/db/pipeline/document_source_add_fields.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_add_fields.h" #include <boost/optional.hpp> #include <boost/smart_ptr/intrusive_ptr.hpp> diff --git a/src/mongo/db/pipeline/document_source_add_fields.h b/src/mongo/db/pipeline/document_source_add_fields.h new file mode 100644 index 00000000000..b1ce0415ab3 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_add_fields.h @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source_single_document_transformation.h" + +namespace mongo { + +/** + * $addFields adds or replaces the specified fields to/in the document while preserving the original + * document. It is modeled on and throws the same errors as $project. + */ +class DocumentSourceAddFields final { +public: + /** + * Convenience method for creating a $addFields stage from 'addFieldsSpec'. + */ + static boost::intrusive_ptr<DocumentSource> create( + BSONObj addFieldsSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** + * Parses a $addFields stage from the user-supplied BSON. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + +private: + DocumentSourceAddFields() = default; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_add_fields_test.cpp b/src/mongo/db/pipeline/document_source_add_fields_test.cpp index 724c896e4c0..53ff48ae906 100644 --- a/src/mongo/db/pipeline/document_source_add_fields_test.cpp +++ b/src/mongo/db/pipeline/document_source_add_fields_test.cpp @@ -33,6 +33,8 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_add_fields.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" diff --git a/src/mongo/db/pipeline/document_source_bucket.cpp b/src/mongo/db/pipeline/document_source_bucket.cpp index 484f4b9a62f..05152b7e313 100644 --- a/src/mongo/db/pipeline/document_source_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_bucket.cpp @@ -26,8 +26,11 @@ * it in the license file. */ -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_bucket.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_bucket.h b/src/mongo/db/pipeline/document_source_bucket.h new file mode 100644 index 00000000000..cd7fe31b14e --- /dev/null +++ b/src/mongo/db/pipeline/document_source_bucket.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * The $bucket stage is an alias for a $group stage followed by a $sort stage. + */ +class DocumentSourceBucket final { +public: + /** + * Returns a $group stage followed by a $sort stage. + */ + static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceBucket() = default; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index 31374548175..c9a09354434 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_bucket_auto.h" #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h new file mode 100644 index 00000000000..9279bcd52b9 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/granularity_rounder.h" +#include "mongo/db/sorter/sorter.h" + +namespace mongo { + +/** + * The $bucketAuto stage takes a user-specified number of buckets and automatically determines + * boundaries such that the values are approximately equally distributed between those buckets. + */ +class DocumentSourceBucketAuto final : public DocumentSource, public SplittableDocumentSource { +public: + Value serialize(bool explain = false) const final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + GetNextResult getNext() final; + void dispose() final; + const char* getSourceName() const final; + + /** + * The $bucketAuto stage must be run on the merging shard. + */ + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + } + + static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + + /** + * Convenience method to create a $bucketAuto stage. + * + * If 'accumulationStatements' is the empty vector, it will be filled in with the statement + * 'count: {$sum: 1}'. + */ + static boost::intrusive_ptr<DocumentSourceBucketAuto> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + Variables::Id numVariables, + int numBuckets, + std::vector<AccumulationStatement> accumulationStatements = {}, + const boost::intrusive_ptr<GranularityRounder>& granularityRounder = nullptr, + uint64_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); + + /** + * Parses a $bucketAuto stage from the user-supplied BSON. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + Variables::Id numVariables, + int numBuckets, + std::vector<AccumulationStatement> accumulationStatements, + const boost::intrusive_ptr<GranularityRounder>& granularityRounder, + uint64_t maxMemoryUsageBytes); + + // struct for holding information about a bucket. + struct Bucket { + Bucket(Value min, Value max, std::vector<Accumulator::Factory> accumulatorFactories); + Value _min; + Value _max; + std::vector<boost::intrusive_ptr<Accumulator>> _accums; + }; + + /** + * Consumes all of the documents from the source in the pipeline and sorts them by their + * 'groupBy' value. This method might not be able to finish populating the sorter in a single + * call if 'pSource' returns a DocumentSource::GetNextResult::kPauseExecution, so this returns + * the last GetNextResult encountered, which may be either kEOF or kPauseExecution. + */ + GetNextResult populateSorter(); + + /** + * Computes the 'groupBy' expression value for 'doc'. + */ + Value extractKey(const Document& doc); + + /** + * Calculates the bucket boundaries for the input documents and places them into buckets. + */ + void populateBuckets(); + + /** + * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'. + */ + void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket); + + /** + * Adds 'newBucket' to _buckets and updates any boundaries if necessary. + */ + void addBucket(Bucket& newBucket); + + /** + * Makes a document using the information from bucket. This is what is returned when getNext() + * is called. + */ + Document makeDocument(const Bucket& bucket); + + std::unique_ptr<Sorter<Value, Document>> _sorter; + std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput; + + // _fieldNames contains the field names for the result documents, _accumulatorFactories contains + // the accumulator factories for the result documents, and _expressions contains the common + // expressions used by each instance of each accumulator in order to find the right-hand side of + // what gets added to the accumulator. These three vectors parallel each other. + std::vector<std::string> _fieldNames; + std::vector<Accumulator::Factory> _accumulatorFactories; + std::vector<boost::intrusive_ptr<Expression>> _expressions; + + int _nBuckets; + uint64_t _maxMemoryUsageBytes; + bool _populated = false; + std::vector<Bucket> _buckets; + std::vector<Bucket>::iterator _bucketsIterator; + std::unique_ptr<Variables> _variables; + boost::intrusive_ptr<Expression> _groupByExpression; + boost::intrusive_ptr<GranularityRounder> _granularityRounder; + long long _nDocuments = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index 484826aaea6..b3678dbce40 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -40,7 +40,9 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_bucket_auto.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" #include "mongo/unittest/temp_dir.h" diff --git a/src/mongo/db/pipeline/document_source_bucket_test.cpp b/src/mongo/db/pipeline/document_source_bucket_test.cpp index 4f6f3c7f9c3..b93916f4347 100644 --- a/src/mongo/db/pipeline/document_source_bucket_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_test.cpp @@ -35,7 +35,10 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_bucket.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 4c81626942b..30ebb4916fc 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_coll_stats.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h new file mode 100644 index 00000000000..d8cf2eb6718 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * Provides a document source interface to retrieve collection-level statistics for a given + * collection. + */ +class DocumentSourceCollStats : public DocumentSourceNeedsMongod { +public: + class LiteParsed final : public LiteParsedDocumentSource { + public: + static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, + const BSONElement& spec) { + return stdx::make_unique<LiteParsed>(); + } + + bool isCollStats() const final { + return true; + } + + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + return stdx::unordered_set<NamespaceString>(); + } + }; + + DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSourceNeedsMongod(pExpCtx) {} + + GetNextResult getNext() final; + + const char* getSourceName() const final; + + bool isValidInitialSource() const final; + + Value serialize(bool explain = false) const; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + // The raw object given to $collStats containing user specified options. + BSONObj _collStatsSpec; + bool _finished = false; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_count.cpp b/src/mongo/db/pipeline/document_source_count.cpp index 269126b219b..01c4044fece 100644 --- a/src/mongo/db/pipeline/document_source_count.cpp +++ b/src/mongo/db/pipeline/document_source_count.cpp @@ -28,9 +28,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_count.h" #include "mongo/db/jsobj.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_count.h b/src/mongo/db/pipeline/document_source_count.h new file mode 100644 index 00000000000..e1817fbfca3 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_count.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * The $count stage is an alias for a $group stage followed by a $project stage. + */ +class DocumentSourceCount final { +public: + /** + * Returns a $group stage followed by a $project stage. + */ + static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceCount() = default; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_count_test.cpp b/src/mongo/db/pipeline/document_source_count_test.cpp index 552e69150b1..5535ee5ea83 100644 --- a/src/mongo/db/pipeline/document_source_count_test.cpp +++ b/src/mongo/db/pipeline/document_source_count_test.cpp @@ -37,7 +37,9 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_count.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index e22e72c1ede..0ee8332c9bc 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/db_raii.h" diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h new file mode 100644 index 00000000000..4d2cb1a81df --- /dev/null +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <deque> + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/query/plan_summary_stats.h" + +namespace mongo { + +class PlanExecutor; + +/** + * Constructs and returns Documents from the BSONObj objects produced by a supplied + * PlanExecutor. + * + * An object of this type may only be used by one thread, see SERVER-6123. + */ +class DocumentSourceCursor final : public DocumentSource { +public: + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + BSONObjSet getOutputSorts() final { + return _outputSorts; + } + /** + * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + Value serialize(bool explain = false) const final; + bool isValidInitialSource() const final { + return true; + } + void dispose() final; + + void detachFromOperationContext() final; + + void reattachToOperationContext(OperationContext* opCtx) final; + + /** + * Create a document source based on a passed-in PlanExecutor. + * + * This is usually put at the beginning of a chain of document sources + * in order to fetch data from the database. + */ + static boost::intrusive_ptr<DocumentSourceCursor> create( + const std::string& ns, + std::unique_ptr<PlanExecutor> exec, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /* + Record the query that was specified for the cursor this wraps, if + any. + + This should be captured after any optimizations are applied to + the pipeline so that it reflects what is really used. + + This gets used for explain output. + + @param pBsonObj the query to record + */ + void setQuery(const BSONObj& query) { + _query = query; + } + + /* + Record the sort that was specified for the cursor this wraps, if + any. + + This should be captured after any optimizations are applied to + the pipeline so that it reflects what is really used. + + This gets used for explain output. + + @param pBsonObj the sort to record + */ + void setSort(const BSONObj& sort) { + _sort = sort; + } + + /** + * Informs this object of projection and dependency information. + * + * @param projection The projection that has been passed down to the query system. + * @param deps The output of DepsTracker::toParsedDeps. + */ + void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps); + + /// returns -1 for no limit + long long getLimit() const; + + /** + * If subsequent sources need no information from the cursor, the cursor can simply output empty + * documents, avoiding the overhead of converting BSONObjs to Documents. + */ + void shouldProduceEmptyDocs() { + _shouldProduceEmptyDocs = true; + } + + const std::string& getPlanSummaryStr() const; + + const PlanSummaryStats& getPlanSummaryStats() const; + +protected: + void doInjectExpressionContext() final; + +private: + DocumentSourceCursor(const std::string& ns, + std::unique_ptr<PlanExecutor> exec, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + void loadBatch(); + + void recordPlanSummaryStr(); + + void recordPlanSummaryStats(); + + std::deque<Document> _currentBatch; + + // BSONObj members must outlive _projection and cursor. + BSONObj _query; + BSONObj _sort; + BSONObj _projection; + bool _shouldProduceEmptyDocs = false; + boost::optional<ParsedDeps> _dependencies; + boost::intrusive_ptr<DocumentSourceLimit> _limit; + long long _docsAddedToBatches; // for _limit enforcement + + const std::string _ns; + std::unique_ptr<PlanExecutor> _exec; + BSONObjSet _outputSorts; + std::string _planSummary; + PlanSummaryStats _planSummaryStats; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 55fd916e02c..7b8cd82de0f 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source_tee_consumer.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/tee_buffer.h" #include "mongo/db/pipeline/value.h" diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 9c8cbce17e0..72d79fd56fe 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -39,6 +39,9 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index 1dd78b75a09..8c4a2ca3ff5 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -30,9 +30,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h new file mode 100644 index 00000000000..c38c311a4e5 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/field_path.h" + +namespace mongo { + +class DocumentSourceGeoNear : public DocumentSourceNeedsMongod, public SplittableDocumentSource { +public: + static const long long kDefaultLimit; + + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + /** + * Attempts to combine with a subsequent limit stage, setting the internal limit field + * as a result. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + bool isValidInitialSource() const final { + return true; + } + Value serialize(bool explain = false) const final; + BSONObjSet getOutputSorts() final { + return SimpleBSONObjComparator::kInstance.makeBSONObjSet( + {BSON(distanceField->fullPath() << -1)}); + } + + // Virtuals for SplittableDocumentSource + boost::intrusive_ptr<DocumentSource> getShardSource() final; + boost::intrusive_ptr<DocumentSource> getMergeSource() final; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx); + + static char geoNearName[]; + + long long getLimit() { + return limit; + } + + BSONObj getQuery() const { + return query; + }; + + // this should only be used for testing + static boost::intrusive_ptr<DocumentSourceGeoNear> create( + const boost::intrusive_ptr<ExpressionContext>& pCtx); + +private: + explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + void parseOptions(BSONObj options); + BSONObj buildGeoNearCmd() const; + void runCommand(); + + // These fields describe the command to run. + // coords and distanceField are required, rest are optional + BSONObj coords; // "near" option, but near is a reserved keyword on windows + bool coordsIsArray; + std::unique_ptr<FieldPath> distanceField; // Using unique_ptr because FieldPath can't be empty + long long limit; + double maxDistance; + double minDistance; + BSONObj query; + bool spherical; + double distanceMultiplier; + std::unique_ptr<FieldPath> includeLocs; + + // these fields are used while processing the results + BSONObj cmdOutput; + std::unique_ptr<BSONObjIterator> resultsIterator; // iterator over cmdOutput["results"] +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_geo_near_test.cpp b/src/mongo/db/pipeline/document_source_geo_near_test.cpp index 2e4ef356114..858cd274f65 100644 --- a/src/mongo/db/pipeline/document_source_geo_near_test.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near_test.cpp @@ -33,7 +33,8 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_geo_near.h" +#include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/pipeline.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 7b6d6c0b22a..90475dfefcb 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "document_source.h" +#include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/base/init.h" #include "mongo/db/bson/dotted_path_support.h" diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h new file mode 100644 index 00000000000..01473bf44fc --- /dev/null +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_unwind.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/lookup_set_cache.h" +#include "mongo/db/pipeline/value_comparator.h" + +namespace mongo { + +class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongod { +public: + static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse( + const AggregationRequest& request, const BSONElement& spec); + + GetNextResult getNext() final; + const char* getSourceName() const final; + void dispose() final; + BSONObjSet getOutputSorts() final; + void serializeToArray(std::vector<Value>& array, bool explain = false) const final; + + /** + * Returns the 'as' path, and possibly the fields modified by an absorbed $unwind. + */ + GetModPathsReturn getModifiedPaths() const final; + + bool canSwapWithMatch() const final { + return true; + } + + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final { + _startWith->addDependencies(deps); + return SEE_NEXT; + }; + + bool needsPrimaryShard() const final { + return true; + } + + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { + collections->push_back(_from); + } + + void doDetachFromOperationContext() final; + + void doReattachToOperationContext(OperationContext* opCtx) final; + + static boost::intrusive_ptr<DocumentSourceGraphLookUp> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + NamespaceString fromNs, + std::string asField, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<BSONObj> additionalFilter, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc); + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +protected: + void doInjectExpressionContext() final; + +private: + DocumentSourceGraphLookUp( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<BSONObj> additionalFilter, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc); + + Value serialize(bool explain = false) const final { + // Should not be called; use serializeToArray instead. + MONGO_UNREACHABLE; + } + + /** + * Prepares the query to execute on the 'from' collection wrapped in a $match by using the + * contents of '_frontier'. + * + * Fills 'cached' with any values that were retrieved from the cache. + * + * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache. + * Otherwise, returns a query object. + */ + boost::optional<BSONObj> makeMatchStageFromFrontier(BSONObjSet* cached); + + /** + * If we have internalized a $unwind, getNext() dispatches to this function. + */ + GetNextResult getNextUnwound(); + + /** + * Perform a breadth-first search of the 'from' collection. '_frontier' should already be + * populated with the values for the initial query. Populates '_discovered' with the result(s) + * of the query. + */ + void doBreadthFirstSearch(); + + /** + * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a + * breadth-first search. Caller should check that _input is not boost::none. + */ + void performSearch(); + + /** + * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying + * for 'queried'. + */ + void addToCache(const BSONObj& result, const ValueUnorderedSet& queried); + + /** + * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then + * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'. + */ + void checkMemoryUsage(); + + /** + * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier' + * with the object's 'connectTo' values. + * + * Returns whether '_visited' was updated, and thus, whether the search should recurse. + */ + bool addToVisitedAndFrontier(BSONObj result, long long depth); + + // $graphLookup options. + NamespaceString _from; + FieldPath _as; + FieldPath _connectFromField; + FieldPath _connectToField; + boost::intrusive_ptr<Expression> _startWith; + boost::optional<BSONObj> _additionalFilter; + boost::optional<FieldPath> _depthField; + boost::optional<long long> _maxDepth; + + // The ExpressionContext used when performing aggregation pipelines against the '_from' + // namespace. + boost::intrusive_ptr<ExpressionContext> _fromExpCtx; + + // The aggregation pipeline to perform against the '_from' namespace. + std::vector<BSONObj> _fromPipeline; + + size_t _maxMemoryUsageBytes = 100 * 1024 * 1024; + + // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'. + size_t _visitedUsageBytes = 0; + size_t _frontierUsageBytes = 0; + + // Only used during the breadth-first search, tracks the set of values on the current frontier. + // We use boost::optional to defer initialization until the ExpressionContext containing the + // correct comparator is injected. + boost::optional<ValueUnorderedSet> _frontier; + + // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the + // document from the foreign collection, value is the document itself. The keys are compared + // using the simple collation. + ValueUnorderedMap<BSONObj> _visited; + + // Caches query results to avoid repeating any work. This structure is maintained across calls + // to getNext(). + LookupSetCache _cache; + + // When we have internalized a $unwind, we must keep track of the input document, since we will + // need it for multiple "getNext()" calls. + boost::optional<Document> _input; + + // The variables that are in scope to be used by the '_startWith' expression. + std::unique_ptr<Variables> _variables; + + // Keep track of a $unwind that was absorbed into this stage. + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind; + + // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that + // field, tracking how many results we've returned so far for the current input document. + long long _outputIndex; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index ac25e48936e..2229332e859 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -28,13 +28,13 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" - #include <algorithm> #include <deque> #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_graph_lookup.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/stub_mongod_interface.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 31012cdc476..cf91c8eff5f 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -28,12 +28,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" - #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h new file mode 100644 index 00000000000..76d9282ebb9 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_group.h @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> +#include <utility> + +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/sorter/sorter.h" + +namespace mongo { + +class DocumentSourceGroup final : public DocumentSource, public SplittableDocumentSource { +public: + using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>; + using GroupsMap = ValueUnorderedMap<Accumulators>; + + static const size_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; + + // Virtuals from DocumentSource. + boost::intrusive_ptr<DocumentSource> optimize() final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + Value serialize(bool explain = false) const final; + GetNextResult getNext() final; + void dispose() final; + const char* getSourceName() const final; + BSONObjSet getOutputSorts() final; + + /** + * Convenience method for creating a new $group stage. + */ + static boost::intrusive_ptr<DocumentSourceGroup> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Expression>& groupByExpression, + std::vector<AccumulationStatement> accumulationStatements, + Variables::Id numVariables, + size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); + + /** + * Parses 'elem' into a $group stage, or throws a UserException if 'elem' was an invalid + * specification. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * Add an accumulator, which will become a field in each Document that results from grouping. + */ + void addAccumulator(AccumulationStatement accumulationStatement); + + /** + * Sets the expression to use to determine the group id of each document. + */ + void setIdExpression(const boost::intrusive_ptr<Expression> idExpression); + + /** + * Tell this source if it is doing a merge from shards. Defaults to false. + */ + void setDoingMerge(bool doingMerge) { + _doingMerge = doingMerge; + } + + bool isStreaming() const { + return _streaming; + } + + // Virtuals for SplittableDocumentSource. + boost::intrusive_ptr<DocumentSource> getShardSource() final; + boost::intrusive_ptr<DocumentSource> getMergeSource() final; + +protected: + void doInjectExpressionContext() final; + +private: + explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes); + + /** + * getNext() dispatches to one of these three depending on what type of $group it is. All three + * of these methods expect '_currentAccumulators' to have been reset before being called, and + * also expect initialize() to have been called already. + */ + GetNextResult getNextStreaming(); + GetNextResult getNextSpilled(); + GetNextResult getNextStandard(); + + /** + * Attempt to identify an input sort order that allows us to turn into a streaming $group. If we + * find one, return it. Otherwise, return boost::none. + */ + boost::optional<BSONObj> findRelevantInputSort() const; + + /** + * Before returning anything, this source must prepare itself. In a streaming $group, + * initialize() requests the first document from the previous source, and uses it to prepare the + * accumulators. In an unsorted $group, initialize() exhausts the previous source before + * returning. The '_initialized' boolean indicates that initialize() has finished. + * + * This method may not be able to finish initialization in a single call if 'pSource' returns a + * DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult + * encountered, which may be either kEOF or kPauseExecution. + */ + GetNextResult initialize(); + + /** + * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group + * does not exhaust the previous stage before returning, and thus does not maintain as large a + * store of documents at any one time, only an unsorted group can spill to disk. + */ + std::shared_ptr<Sorter<Value, Value>::Iterator> spill(); + + Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); + + /** + * Computes the internal representation of the group key. + */ + Value computeId(Variables* vars); + + /** + * Converts the internal representation of the group key to the _id shape specified by the + * user. + */ + Value expandId(const Value& val); + + /** + * 'vFieldName' contains the field names for the result documents, 'vpAccumulatorFactory' + * contains the accumulator factories for the result documents, and 'vpExpression' contains the + * common expressions used by each instance of each accumulator in order to find the right-hand + * side of what gets added to the accumulator. These three vectors parallel each other. + */ + std::vector<std::string> vFieldName; + std::vector<Accumulator::Factory> vpAccumulatorFactory; + std::vector<boost::intrusive_ptr<Expression>> vpExpression; + + bool _doingMerge; + size_t _memoryUsageBytes = 0; + size_t _maxMemoryUsageBytes; + std::unique_ptr<Variables> _variables; + std::vector<std::string> _idFieldNames; // used when id is a document + std::vector<boost::intrusive_ptr<Expression>> _idExpressions; + + BSONObj _inputSort; + bool _streaming; + bool _initialized; + + Value _currentId; + Accumulators _currentAccumulators; + + // We use boost::optional to defer initialization until the ExpressionContext containing the + // correct comparator is injected, since the groups must be built using the comparator's + // definition of equality. + boost::optional<GroupsMap> _groups; + + std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles; + bool _spilled; + + // Only used when '_spilled' is false. + GroupsMap::iterator groupsIterator; + + // Only used when '_spilled' is true. + std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; + const bool _extSortAllowed; + + std::pair<Value, Value> _firstPartOfNextGroup; + // Only used when '_sorted' is true. + boost::optional<Document> _firstDocOfNextGroup; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 32b6d1f6f87..d0582101597 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -41,7 +41,8 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp index b6feff69eb7..3256f8e5211 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.cpp +++ b/src/mongo/db/pipeline/document_source_index_stats.cpp @@ -28,9 +28,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_index_stats.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/server_options.h" +#include "mongo/util/net/sock.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h new file mode 100644 index 00000000000..1c1bc521aeb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/collection_index_usage_tracker.h" +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * Provides a document source interface to retrieve index statistics for a given namespace. + * Each document returned represents a single index and mongod instance. + */ +class DocumentSourceIndexStats final : public DocumentSourceNeedsMongod { +public: + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + + virtual bool isValidInitialSource() const final { + return true; + } + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + CollectionIndexUsageMap _indexStatsMap; + CollectionIndexUsageMap::const_iterator _indexStatsIter; + std::string _processName; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index 75ca17eca48..634d635b22d 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/document.h" diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h new file mode 100644 index 00000000000..9c04213045d --- /dev/null +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource { +public: + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + BSONObjSet getOutputSorts() final { + return pSource ? pSource->getOutputSorts() + : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + } + + /** + * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + Value serialize(bool explain = false) const final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final { + return SEE_NEXT; // This doesn't affect needed fields + } + + /** + Create a new limiting DocumentSource. + + @param pExpCtx the expression context for the pipeline + @returns the DocumentSource + */ + static boost::intrusive_ptr<DocumentSourceLimit> create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); + + // Virtuals for SplittableDocumentSource + // Need to run on rounter. Running on shard as well is an optimization. + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return this; + } + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + } + + long long getLimit() const { + return _limit; + } + void setLimit(long long newLimit) { + _limit = newLimit; + } + + /** + Create a limiting DocumentSource from BSON. + + This is a convenience method that uses the above, and operates on + a BSONElement that has been deteremined to be an Object with an + element named $limit. + + @param pBsonElement the BSONELement that defines the limit + @param pExpCtx the expression context + @returns the grouping DocumentSource + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit); + + long long _limit; + long long _nReturned = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp index 2fc83eed374..6294a897ff2 100644 --- a/src/mongo/db/pipeline/document_source_limit_test.cpp +++ b/src/mongo/db/pipeline/document_source_limit_test.cpp @@ -32,7 +32,9 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 7a0727bcf45..5371542505c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "document_source.h" +#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/base/init.h" #include "mongo/db/jsobj.h" diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h new file mode 100644 index 00000000000..13118828b65 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -0,0 +1,161 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_unwind.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/lookup_set_cache.h" +#include "mongo/db/pipeline/value_comparator.h" + +namespace mongo { + +/** + * Queries separate collection for equality matches with documents in the pipeline collection. + * Adds matching documents to a new array field in the input document. + */ +class DocumentSourceLookUp final : public DocumentSourceNeedsMongod, + public SplittableDocumentSource { +public: + static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse( + const AggregationRequest& request, const BSONElement& spec); + + GetNextResult getNext() final; + const char* getSourceName() const final; + void serializeToArray(std::vector<Value>& array, bool explain = false) const final; + + /** + * Returns the 'as' path, and possibly fields modified by an absorbed $unwind. + */ + GetModPathsReturn getModifiedPaths() const final; + + bool canSwapWithMatch() const final { + return true; + } + + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc' + * field. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + void dispose() final; + + BSONObjSet getOutputSorts() final { + return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); + } + + bool needsPrimaryShard() const final { + return true; + } + + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + } + + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { + collections->push_back(_fromNs); + } + + void doDetachFromOperationContext() final; + + void doReattachToOperationContext(OperationContext* opCtx) final; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * Builds the BSONObj used to query the foreign collection and wraps it in a $match. + */ + static BSONObj makeMatchStageFromInput(const Document& input, + const FieldPath& localFieldName, + const std::string& foreignFieldName, + const BSONObj& additionalFilter); + + /** + * Helper to absorb an $unwind stage. Only used for testing this special behavior. + */ + void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) { + invariant(!_handlingUnwind); + _unwindSrc = unwind; + _handlingUnwind = true; + } + +protected: + void doInjectExpressionContext() final; + +private: + DocumentSourceLookUp(NamespaceString fromNs, + std::string as, + std::string localField, + std::string foreignField, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + Value serialize(bool explain = false) const final { + // Should not be called; use serializeToArray instead. + MONGO_UNREACHABLE; + } + + GetNextResult unwindResult(); + + NamespaceString _fromNs; + FieldPath _as; + FieldPath _localField; + FieldPath _foreignField; + std::string _foreignFieldFieldName; + boost::optional<BSONObj> _additionalFilter; + + // The ExpressionContext used when performing aggregation pipelines against the '_fromNs' + // namespace. + boost::intrusive_ptr<ExpressionContext> _fromExpCtx; + + // The aggregation pipeline to perform against the '_fromNs' namespace. + std::vector<BSONObj> _fromPipeline; + + boost::intrusive_ptr<DocumentSourceMatch> _matchSrc; + boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc; + + bool _handlingUnwind = false; + bool _handlingMatch = false; + + // The following members are used to hold onto state across getNext() calls when + // '_handlingUnwind' is true. + long long _cursorIndex = 0; + boost::intrusive_ptr<Pipeline> _pipeline; + boost::optional<Document> _input; + boost::optional<Document> _nextValue; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 229b50906e9..cbf5b7838a2 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -37,7 +37,8 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_lookup.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/stub_mongod_interface.h" diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index d8df9116145..f58990a2669 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/jsobj.h" #include "mongo/db/matcher/expression_algo.h" diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h new file mode 100644 index 00000000000..4e2354c6d83 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_match.h @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> +#include <utility> + +#include "mongo/client/connpool.h" +#include "mongo/db/matcher/matcher.h" +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +class DocumentSourceMatch final : public DocumentSource { +public: + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + boost::intrusive_ptr<DocumentSource> optimize() final; + BSONObjSet getOutputSorts() final { + return pSource ? pSource->getOutputSorts() + : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + } + + /** + * Attempts to combine with any subsequent $match stages, joining the query objects with a + * $and. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + void setSource(DocumentSource* Source) final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final; + + /** + * Convenience method for creating a $match stage. + */ + static boost::intrusive_ptr<DocumentSourceMatch> create( + BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** + * Parses a $match stage from 'elem'. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx); + + /** + * Access the MatchExpression stored inside the DocumentSourceMatch. Does not release ownership. + */ + MatchExpression* getMatchExpression() const { + return _expression.get(); + } + + /** + * Combines the filter in this $match with the filter of 'other' using a $and, updating this + * match in place. + */ + void joinMatchWith(boost::intrusive_ptr<DocumentSourceMatch> other); + + /** + * Returns the query in MatchExpression syntax. + */ + BSONObj getQuery() const; + + /** Returns the portion of the match that can safely be promoted to before a $redact. + * If this returns an empty BSONObj, no part of this match may safely be promoted. + * + * To be safe to promote, removing a field from a document to be matched must not cause + * that document to be accepted when it would otherwise be rejected. As an example, + * {name: {$ne: "bob smith"}} accepts documents without a name field, which means that + * running this filter before a redact that would remove the name field would leak + * information. On the other hand, {age: {$gt:5}} is ok because it doesn't accept documents + * that have had their age field removed. + */ + BSONObj redactSafePortion() const; + + static bool isTextQuery(const BSONObj& query); + bool isTextQuery() const { + return _isTextQuery; + } + + /** + * Attempt to split this $match into two stages, where the first is not dependent upon any path + * from 'fields', and where applying them in sequence is equivalent to applying this stage once. + * + * Will return two intrusive_ptrs to new $match stages, where the first pointer is independent + * of 'fields', and the second is dependent. Either pointer may be null, so be sure to check the + * return value. + * + * For example, {$match: {a: "foo", "b.c": 4}} split by "b" will return pointers to two stages: + * {$match: {a: "foo"}}, and {$match: {"b.c": 4}}. + */ + std::pair<boost::intrusive_ptr<DocumentSourceMatch>, boost::intrusive_ptr<DocumentSourceMatch>> + splitSourceBy(const std::set<std::string>& fields); + + /** + * Given a document 'input', extract 'fields' and produce a BSONObj with those values. + */ + static BSONObj getObjectForMatch(const Document& input, const std::set<std::string>& fields); + + /** + * Should be called _only_ on a MatchExpression that is a predicate on 'path', or subfields of + * 'path'. It is also invalid to call this method on a $match including a $elemMatch on 'path', + * for example: {$match: {'path': {$elemMatch: {'subfield': 3}}}} + * + * Returns a new DocumentSourceMatch that, if executed on the subdocument at 'path', is + * equivalent to 'expression'. + * + * For example, if the original expression is {$and: [{'a.b': {$gt: 0}}, {'a.d': {$eq: 3}}]}, + * the new $match will have the expression {$and: [{b: {$gt: 0}}, {d: {$eq: 3}}]} after + * descending on the path 'a'. + */ + static boost::intrusive_ptr<DocumentSourceMatch> descendMatchOnPath( + MatchExpression* matchExpr, + const std::string& path, + boost::intrusive_ptr<ExpressionContext> expCtx); + + void doInjectExpressionContext(); + +private: + DocumentSourceMatch(const BSONObj& query, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + void addDependencies(DepsTracker* deps) const; + + std::unique_ptr<MatchExpression> _expression; + + // Cache the dependencies so that we know what fields we need to serialize to BSON for matching. + DepsTracker _dependencies; + + BSONObj _predicate; + bool _isTextQuery; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp index eb4d40bbc97..61cd89143cd 100644 --- a/src/mongo/db/pipeline/document_source_match_test.cpp +++ b/src/mongo/db/pipeline/document_source_match_test.cpp @@ -35,7 +35,8 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 87a5700f206..866259c663e 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h new file mode 100644 index 00000000000..ad853a9f755 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/client/connpool.h" +#include "mongo/db/clientcursor.h" +#include "mongo/db/cursor_id.h" +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +class DocumentSourceMergeCursors : public DocumentSource { +public: + struct CursorDescriptor { + CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId) + : connectionString(std::move(connectionString)), + ns(std::move(ns)), + cursorId(cursorId) {} + + ConnectionString connectionString; + std::string ns; + CursorId cursorId; + }; + + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + void dispose() final; + Value serialize(bool explain = false) const final; + bool isValidInitialSource() const final { + return true; + } + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + static boost::intrusive_ptr<DocumentSource> create( + std::vector<CursorDescriptor> cursorDescriptors, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** Returns non-owning pointers to cursors managed by this stage. + * Call this instead of getNext() if you want access to the raw streams. + * This method should only be called at most once. + */ + std::vector<DBClientCursor*> getCursors(); + + /** + * Returns the next object from the cursor, throwing an appropriate exception if the cursor + * reported an error. This is a better form of DBClientCursor::nextSafe. + */ + static Document nextSafeFrom(DBClientCursor* cursor); + +private: + struct CursorAndConnection { + CursorAndConnection(const CursorDescriptor& cursorDescriptor); + ScopedDbConnection connection; + DBClientCursor cursor; + }; + + // using list to enable removing arbitrary elements + typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors; + + DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + // Converts _cursorDescriptors into active _cursors. + void start(); + + // This is the description of cursors to merge. + const std::vector<CursorDescriptor> _cursorDescriptors; + + // These are the actual cursors we are merging. Created lazily. + Cursors _cursors; + Cursors::iterator _currentCursor; + + bool _unstarted; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp index 945aae771f7..686f4ef6f2e 100644 --- a/src/mongo/db/pipeline/document_source_mock.cpp +++ b/src/mongo/db/pipeline/document_source_mock.cpp @@ -28,8 +28,9 @@ #include "mongo/platform/basic.h" +#include "mongo/db/pipeline/document_source_mock.h" + #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h new file mode 100644 index 00000000000..7a600483661 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <deque> + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * Used in testing to store documents without using the storage layer. Methods are not marked as + * final in order to allow tests to intercept calls if needed. + */ +class DocumentSourceMock : public DocumentSource { +public: + DocumentSourceMock(std::deque<GetNextResult> results); + DocumentSourceMock(std::deque<GetNextResult> results, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + GetNextResult getNext() override; + const char* getSourceName() const override; + Value serialize(bool explain = false) const override; + void dispose() override; + bool isValidInitialSource() const override { + return true; + } + BSONObjSet getOutputSorts() override { + return sorts; + } + + static boost::intrusive_ptr<DocumentSourceMock> create(); + + static boost::intrusive_ptr<DocumentSourceMock> create(Document doc); + + static boost::intrusive_ptr<DocumentSourceMock> create(const GetNextResult& result); + static boost::intrusive_ptr<DocumentSourceMock> create(std::deque<GetNextResult> results); + + static boost::intrusive_ptr<DocumentSourceMock> create(const char* json); + static boost::intrusive_ptr<DocumentSourceMock> create( + const std::initializer_list<const char*>& jsons); + + void reattachToOperationContext(OperationContext* opCtx) { + isDetachedFromOpCtx = false; + } + + void detachFromOperationContext() { + isDetachedFromOpCtx = true; + } + + boost::intrusive_ptr<DocumentSource> optimize() override { + isOptimized = true; + return this; + } + + void doInjectExpressionContext() override { + isExpCtxInjected = true; + } + + // Return documents from front of queue. + std::deque<GetNextResult> queue; + + bool isDisposed = false; + bool isDetachedFromOpCtx = false; + bool isOptimized = false; + bool isExpCtxInjected = false; + + BSONObjSet sorts; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_mock_test.cpp b/src/mongo/db/pipeline/document_source_mock_test.cpp index e4826ff5b24..67942a61246 100644 --- a/src/mongo/db/pipeline/document_source_mock_test.cpp +++ b/src/mongo/db/pipeline/document_source_mock_test.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index be9f76d2207..7dab6365405 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_out.h" #include "mongo/stdx/memory.h" #include "mongo/util/destructor_guard.h" diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h new file mode 100644 index 00000000000..4e8a678404a --- /dev/null +++ b/src/mongo/db/pipeline/document_source_out.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +class DocumentSourceOut final : public DocumentSourceNeedsMongod, public SplittableDocumentSource { +public: + static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse( + const AggregationRequest& request, const BSONElement& spec); + + // virtuals from DocumentSource + ~DocumentSourceOut() final; + GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + bool needsPrimaryShard() const final { + return true; + } + + // Virtuals for SplittableDocumentSource + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return NULL; + } + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + } + + const NamespaceString& getOutputNs() const { + return _outputNs; + } + + /** + Create a document source for output and pass-through. + + This can be put anywhere in a pipeline and will store content as + well as pass it on. + + @param pBsonElement the raw BSON specification for the source + @param pExpCtx the expression context for the pipeline + @returns the newly created document source + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceOut(const NamespaceString& outputNs, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * Sets '_tempNs' to a unique temporary namespace, makes sure the output collection isn't + * sharded or capped, and saves the collection options and indexes of the target collection. + * Then creates the temporary collection we will insert into by copying the collection options + * and indexes from the target collection. + * + * Sets '_initialized' to true upon completion. + */ + void initialize(); + + /** + * Inserts all of 'toInsert' into the temporary collection. + */ + void spill(const std::vector<BSONObj>& toInsert); + + bool _initialized = false; + bool _done = false; + + // Holds on to the original collection options and index specs so we can check they didn't + // change during computation. + BSONObj _originalOutOptions; + std::list<BSONObj> _originalIndexes; + + NamespaceString _tempNs; // output goes here as it is being processed. + const NamespaceString _outputNs; // output will go here after all data is processed. +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp index 8ea144d0d70..fd44d534dde 100644 --- a/src/mongo/db/pipeline/document_source_project.cpp +++ b/src/mongo/db/pipeline/document_source_project.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_project.h" #include <boost/optional.hpp> #include <boost/smart_ptr/intrusive_ptr.hpp> diff --git a/src/mongo/db/pipeline/document_source_project.h b/src/mongo/db/pipeline/document_source_project.h new file mode 100644 index 00000000000..869dce512b1 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_project.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source_single_document_transformation.h" + +namespace mongo { + +/** + * The $project stage can be used for simple transformations such as including or excluding a set + * of fields, or can do more sophisticated things, like include some fields and add new "computed" + * fields, using the expression language. Note you can not mix an exclusion-style projection with + * adding or including any other fields. + */ +class DocumentSourceProject final { +public: + /** + * Convenience method to create a $project stage from 'projectSpec'. + */ + static boost::intrusive_ptr<DocumentSource> create( + BSONObj projectSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** + * Parses a $project stage from the user-supplied BSON. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceProject() = default; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_project_test.cpp b/src/mongo/db/pipeline/document_source_project_test.cpp index 6dca42a98f3..1247de01579 100644 --- a/src/mongo/db/pipeline/document_source_project_test.cpp +++ b/src/mongo/db/pipeline/document_source_project_test.cpp @@ -36,7 +36,8 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/value.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp index e5d8932b0b2..795cbd99113 100644 --- a/src/mongo/db/pipeline/document_source_redact.cpp +++ b/src/mongo/db/pipeline/document_source_redact.cpp @@ -28,12 +28,13 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_redact.h" #include <boost/optional.hpp> #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/value.h" diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h new file mode 100644 index 00000000000..f056411f765 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -0,0 +1,70 @@ + +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression.h" + +namespace mongo { + +class DocumentSourceRedact final : public DocumentSource { +public: + GetNextResult getNext() final; + const char* getSourceName() const final; + boost::intrusive_ptr<DocumentSource> optimize() final; + + /** + * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact + * stage. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + + void doInjectExpressionContext() final; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + Value serialize(bool explain = false) const final; + +private: + DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Expression>& previsit); + + // These both work over _variables + boost::optional<Document> redactObject(); // redacts CURRENT + Value redactValue(const Value& in); + + Variables::Id _currentId; + std::unique_ptr<Variables> _variables; + boost::intrusive_ptr<Expression> _expression; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_redact_test.cpp b/src/mongo/db/pipeline/document_source_redact_test.cpp index a6a4bfd9f7b..461c25e3e83 100644 --- a/src/mongo/db/pipeline/document_source_redact_test.cpp +++ b/src/mongo/db/pipeline/document_source_redact_test.cpp @@ -32,7 +32,9 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_redact.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp index 5625e43b1ab..ceef0e77061 100644 --- a/src/mongo/db/pipeline/document_source_replace_root.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_replace_root.h" #include <boost/smart_ptr/intrusive_ptr.hpp> diff --git a/src/mongo/db/pipeline/document_source_replace_root.h b/src/mongo/db/pipeline/document_source_replace_root.h new file mode 100644 index 00000000000..f3313ec5f7a --- /dev/null +++ b/src/mongo/db/pipeline/document_source_replace_root.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source_single_document_transformation.h" + +namespace mongo { + +/* + * $replaceRoot takes an object containing only an expression in the newRoot field, and replaces + * each incoming document with the result of evaluating that expression. Throws an error if the + * given expression is not an object or if the expression evaluates to the "missing" Value. This + * is implemented as an extension of DocumentSourceSingleDocumentTransformation. + */ +class DocumentSourceReplaceRoot final { +public: + /** + * Creates a new replaceRoot DocumentSource from the BSON specification of the $replaceRoot + * stage. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceReplaceRoot() = default; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_replace_root_test.cpp b/src/mongo/db/pipeline/document_source_replace_root_test.cpp index 16e6c2f5fa0..a65c838dd14 100644 --- a/src/mongo/db/pipeline/document_source_replace_root_test.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root_test.cpp @@ -35,7 +35,8 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 53c3040de7d..a436d3e8a53 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/client.h" #include "mongo/db/pipeline/document.h" diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h new file mode 100644 index 00000000000..ed0eec9c180 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_sort.h" + +namespace mongo { + +class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource { +public: + GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final { + return SEE_NEXT; + } + + boost::intrusive_ptr<DocumentSource> getShardSource() final; + boost::intrusive_ptr<DocumentSource> getMergeSource() final; + + long long getSampleSize() const { + return _size; + } + + void doInjectExpressionContext() final; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + +private: + explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + long long _size; + + // Uses a $sort stage to randomly sort the documents. + boost::intrusive_ptr<DocumentSourceSort> _sortStage; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp index 9cfa83075f3..f4b2299c9e8 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" #include <boost/math/distributions/beta.hpp> diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h new file mode 100644 index 00000000000..6e8a2ae1d39 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/value_comparator.h" + +namespace mongo { + +/** + * This class is not a registered stage, it is only used as an optimized replacement for $sample + * when the storage engine allows us to use a random cursor. + */ +class DocumentSourceSampleFromRandomCursor final : public DocumentSource { +public: + GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + + void doInjectExpressionContext() final; + + static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + long long size, + std::string idField, + long long collectionSize); + +private: + DocumentSourceSampleFromRandomCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx, + long long size, + std::string idField, + long long collectionSize); + + /** + * Keep asking for documents from the random cursor until it yields a new document. Errors if a + * a document is encountered without a value for '_idField', or if the random cursor keeps + * returning duplicate elements. + */ + GetNextResult getNextNonDuplicateDocument(); + + long long _size; + + // The field to use as the id of a document. Usually '_id', but 'ts' for the oplog. + std::string _idField; + + // Keeps track of the documents that have been returned, since a random cursor is allowed to + // return duplicates. We use boost::optional to defer initialization until the ExpressionContext + // containing the correct comparator is injected. + boost::optional<ValueUnorderedSet> _seenDocs; + + // The approximate number of documents in the collection (includes orphans). + const long long _nDocsInColl; + + // The value to be assigned to the randMetaField of outcoming documents. Each call to getNext() + // will decrement this value by an amount scaled by _nDocsInColl as an attempt to appear as if + // the documents were produced by a top-k random sort. + double _randMetaFieldVal = 1.0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sample_test.cpp b/src/mongo/db/pipeline/document_source_sample_test.cpp index 22d6a4e0f25..30a59baf22b 100644 --- a/src/mongo/db/pipeline/document_source_sample_test.cpp +++ b/src/mongo/db/pipeline/document_source_sample_test.cpp @@ -35,7 +35,9 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_sample.h" +#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/service_context.h" diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index 953bcc81e53..67dd7338395 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -28,11 +28,13 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" #include <boost/smart_ptr/intrusive_ptr.hpp> #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/value.h" diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h new file mode 100644 index 00000000000..b2db5b0c3ae --- /dev/null +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * This class is for DocumentSources that take in and return one document at a time, in a 1:1 + * transformation. It should only be used via an alias that passes the transformation logic through + * a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be + * created from BSON. + */ +class DocumentSourceSingleDocumentTransformation final : public DocumentSource { +public: + /** + * This class defines the minimal interface that every parser wishing to take advantage of + * DocumentSourceSingleDocumentTransformation must implement. + * + * This interface ensures that DocumentSourceSingleDocumentTransformations are passed parsed + * objects that can execute the transformation and provide additional features like + * serialization and reporting and returning dependencies. The parser must also provide + * implementations for optimizing and adding the expression context, even if those functions do + * nothing. + */ + class TransformerInterface { + public: + virtual ~TransformerInterface() = default; + virtual Document applyTransformation(Document input) = 0; + virtual void optimize() = 0; + virtual Document serialize(bool explain) const = 0; + virtual DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const = 0; + virtual void injectExpressionContext( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx) = 0; + virtual GetModPathsReturn getModifiedPaths() const = 0; + }; + + DocumentSourceSingleDocumentTransformation( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + std::unique_ptr<TransformerInterface> parsedTransform, + std::string name); + + // virtuals from DocumentSource + const char* getSourceName() const final; + GetNextResult getNext() final; + boost::intrusive_ptr<DocumentSource> optimize() final; + void dispose() final; + Value serialize(bool explain) const final; + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + void doInjectExpressionContext() final; + DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; + GetModPathsReturn getModifiedPaths() const final; + + bool canSwapWithMatch() const final { + return true; + } + +private: + // Stores transformation logic. + std::unique_ptr<TransformerInterface> _parsedTransform; + + // Specific name of the transformation. + std::string _name; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index e052c24895d..27bb6d1d2a6 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -28,10 +28,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h new file mode 100644 index 00000000000..92d087e3c75 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource { +public: + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + /** + * Attempts to move a subsequent $limit before the skip, potentially allowing for forther + * optimizations earlier in the pipeline. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + Value serialize(bool explain = false) const final; + boost::intrusive_ptr<DocumentSource> optimize() final; + BSONObjSet getOutputSorts() final { + return pSource ? pSource->getOutputSorts() + : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); + } + + GetDepsReturn getDependencies(DepsTracker* deps) const final { + return SEE_NEXT; // This doesn't affect needed fields + } + + // Virtuals for SplittableDocumentSource + // Need to run on rounter. Can't run on shards. + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return NULL; + } + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + } + + long long getSkip() const { + return _nToSkip; + } + void setSkip(long long newSkip) { + _nToSkip = newSkip; + } + + /** + * Convenience method for creating a $skip stage. + */ + static boost::intrusive_ptr<DocumentSourceSkip> create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip); + + /** + * Parses the user-supplied BSON into a $skip stage. + * + * Throws a UserException if 'elem' is an invalid $skip specification. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + long long nToSkip); + + long long _nToSkip = 0; + long long _nSkippedSoFar = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_skip_test.cpp b/src/mongo/db/pipeline/document_source_skip_test.cpp index 175efad4788..4a5710bc5bc 100644 --- a/src/mongo/db/pipeline/document_source_skip_test.cpp +++ b/src/mongo/db/pipeline/document_source_skip_test.cpp @@ -30,7 +30,8 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_skip.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 156b0499d20..4f4f17da4f2 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -28,10 +28,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h new file mode 100644 index 00000000000..d9266011a55 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/sorter/sorter.h" + +namespace mongo { + +class Expression; + +class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource { +public: + static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; + + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + void serializeToArray(std::vector<Value>& array, bool explain = false) const final; + + GetModPathsReturn getModifiedPaths() const final { + // A $sort does not modify any paths. + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}}; + } + + bool canSwapWithMatch() const final { + return true; + } + + BSONObjSet getOutputSorts() final { + return allPrefixes(_sort); + } + + /** + * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort. + */ + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + void dispose() final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final; + + boost::intrusive_ptr<DocumentSource> getShardSource() final; + boost::intrusive_ptr<DocumentSource> getMergeSource() final; + + /// Write out a Document whose contents are the sort key. + Document serializeSortKey(bool explain) const; + + /** + * Parses a $sort stage from the user-supplied BSON. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** + * Convenience method for creating a $sort stage. + */ + static boost::intrusive_ptr<DocumentSourceSort> create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + BSONObj sortOrder, + long long limit = -1, + uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); + + /** + * Returns -1 for no limit. + */ + long long getLimit() const; + + /** + * Loads a document to be sorted. This can be used to sort a stream of documents that are not + * coming from another DocumentSource. Once all documents have been added, the caller must call + * loadingDone() before using getNext() to receive the documents in sorted order. + */ + void loadDocument(const Document& doc); + + /** + * Signals to the sort stage that there will be no more input documents. It is an error to call + * loadDocument() once this method returns. + */ + void loadingDone(); + + /** + * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that + * have already been sorted. + */ + void populateFromCursors(const std::vector<DBClientCursor*>& cursors); + + bool isPopulated() { + return _populated; + }; + + boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const { + return limitSrc; + } + +private: + explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + Value serialize(bool explain = false) const final { + MONGO_UNREACHABLE; // Should call serializeToArray instead. + } + + /** + * Helper to add a sort key to this stage. + */ + void addKey(StringData fieldPath, bool ascending); + + /** + * Before returning anything, we have to consume all input and sort it. This method consumes all + * input and prepares the sorted stream '_output'. + * + * This method may not be able to finish populating the sorter in a single call if 'pSource' + * returns a DocumentSource::GetNextResult::kPauseExecution, so it returns the last + * GetNextResult encountered, which may be either kEOF or kPauseExecution. + */ + GetNextResult populate(); + bool _populated = false; + + BSONObj _sort; + + SortOptions makeSortOptions() const; + + // This is used to merge pre-sorted results from a DocumentSourceMergeCursors. + class IteratorFromCursor; + + /* these two parallel each other */ + typedef std::vector<boost::intrusive_ptr<Expression>> SortKey; + SortKey vSortKey; + std::vector<char> vAscending; // used like std::vector<bool> but without specialization + + /// Extracts the fields in vSortKey from the Document; + Value extractKey(const Document& d) const; + + /// Compare two Values according to the specified sort key. + int compare(const Value& lhs, const Value& rhs) const; + + typedef Sorter<Value, Document> MySorter; + + /** + * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep + * the smallest limit. + */ + void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) { + if (!limitSrc || limit->getLimit() < limitSrc->getLimit()) { + limitSrc = limit; + } + } + + // For MySorter + class Comparator { + public: + explicit Comparator(const DocumentSourceSort& source) : _source(source) {} + int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const { + return _source.compare(lhs.first, rhs.first); + } + + private: + const DocumentSourceSort& _source; + }; + + boost::intrusive_ptr<DocumentSourceLimit> limitSrc; + + uint64_t _maxMemoryUsageBytes; + bool _done; + bool _mergingPresorted; + std::unique_ptr<MySorter> _sorter; + std::unique_ptr<MySorter::Iterator> _output; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.cpp b/src/mongo/db/pipeline/document_source_sort_by_count.cpp index 021843b3e7f..48cfaa9f559 100644 --- a/src/mongo/db/pipeline/document_source_sort_by_count.cpp +++ b/src/mongo/db/pipeline/document_source_sort_by_count.cpp @@ -28,9 +28,11 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_sort_by_count.h" #include "mongo/db/jsobj.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.h b/src/mongo/db/pipeline/document_source_sort_by_count.h new file mode 100644 index 00000000000..2444d048259 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sort_by_count.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * The $sortByCount stage is an alias for a $group stage followed by a $sort stage. + */ +class DocumentSourceSortByCount final { +public: + /** + * Returns a $group stage followed by a $sort stage. + */ + static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceSortByCount() = default; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp index bed658e616a..46afa82a3a5 100644 --- a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp @@ -36,7 +36,10 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_sort_by_count.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 022b2040ba3..52da88fd616 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -40,7 +40,8 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/unittest/temp_dir.h" diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp index 02b8dc62355..368b29afbd4 100644 --- a/src/mongo/db/pipeline/document_source_unwind.cpp +++ b/src/mongo/db/pipeline/document_source_unwind.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/jsobj.h" #include "mongo/db/pipeline/document.h" diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h new file mode 100644 index 00000000000..9a9c466d037 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/field_path.h" + +namespace mongo { + +class DocumentSourceUnwind final : public DocumentSource { +public: + // virtuals from DocumentSource + GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + BSONObjSet getOutputSorts() final; + + /** + * Returns the unwound path, and the 'includeArrayIndex' path, if specified. + */ + GetModPathsReturn getModifiedPaths() const final; + + bool canSwapWithMatch() const final { + return true; + } + + GetDepsReturn getDependencies(DepsTracker* deps) const final; + + /** + * Creates a new $unwind DocumentSource from a BSON specification. + */ + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + static boost::intrusive_ptr<DocumentSourceUnwind> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const std::string& path, + bool includeNullIfEmptyOrMissing, + const boost::optional<std::string>& includeArrayIndex); + + std::string getUnwindPath() const { + return _unwindPath.fullPath(); + } + + bool preserveNullAndEmptyArrays() const { + return _preserveNullAndEmptyArrays; + } + + const boost::optional<FieldPath>& indexPath() const { + return _indexPath; + } + +private: + DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const FieldPath& fieldPath, + bool includeNullIfEmptyOrMissing, + const boost::optional<FieldPath>& includeArrayIndex); + + // Configuration state. + const FieldPath _unwindPath; + // Documents that have a nullish value, or an empty array for the field '_unwindPath', will pass + // through the $unwind stage unmodified if '_preserveNullAndEmptyArrays' is true. + const bool _preserveNullAndEmptyArrays; + // If set, the $unwind stage will include the array index in the specified path, overwriting any + // existing value, setting to null when the value was a non-array or empty array. + const boost::optional<FieldPath> _indexPath; + + // Iteration state. + class Unwinder; + std::unique_ptr<Unwinder> _unwinder; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_unwind_test.cpp b/src/mongo/db/pipeline/document_source_unwind_test.cpp index a782d7b521f..1e62f3bc9fc 100644 --- a/src/mongo/db/pipeline/document_source_unwind_test.cpp +++ b/src/mongo/db/pipeline/document_source_unwind_test.cpp @@ -39,7 +39,8 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value_comparator.h" diff --git a/src/mongo/db/pipeline/parsed_aggregation_projection.h b/src/mongo/db/pipeline/parsed_aggregation_projection.h index 6e8d9100845..a637365565e 100644 --- a/src/mongo/db/pipeline/parsed_aggregation_projection.h +++ b/src/mongo/db/pipeline/parsed_aggregation_projection.h @@ -34,7 +34,7 @@ #include <memory> #include "mongo/bson/bsonelement.h" -#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/field_path.h" namespace mongo { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index f003df39da9..b296d3f85df 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -40,6 +40,11 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_geo_near.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/util/mongoutils/str.h" diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 2666ff1cccb..bb681edd3f1 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -49,6 +49,12 @@ #include "mongo/db/index/index_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/db/pipeline/document_source_sample.h" +#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 315b0b10cfd..7b4331d60b8 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/field_path.h" diff --git a/src/mongo/db/pipeline/tee_buffer_test.cpp b/src/mongo/db/pipeline/tee_buffer_test.cpp index 10a692922e0..0a8fc7d22a2 100644 --- a/src/mongo/db/pipeline/tee_buffer_test.cpp +++ b/src/mongo/db/pipeline/tee_buffer_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/tee_buffer.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" |