summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/document_source.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source.h1794
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields.h57
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_bucket.h50
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h158
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h77
-rw-r--r--src/mongo/db/pipeline/document_source_count.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_count.h50
-rw-r--r--src/mongo/db/pipeline/document_source_count_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h166
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h104
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h219
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_group.h197
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h62
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h102
-rw-r--r--src/mongo/db/pipeline/document_source_limit_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h161
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_match.h162
-rw-r--r--src/mongo/db/pipeline/document_source_match_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h105
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h97
-rw-r--r--src/mongo/db/pipeline/document_source_mock_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_out.h106
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_project.h59
-rw-r--r--src/mongo/db/pipeline/document_source_project_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_redact.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h70
-rw-r--r--src/mongo/db/pipeline/document_source_redact_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.h54
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h67
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h87
-rw-r--r--src/mongo/db/pipeline/document_source_sample_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h94
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h95
-rw-r--r--src/mongo/db/pipeline/document_source_skip_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h197
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count.h50
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h99
-rw-r--r--src/mongo/db/pipeline/document_source_unwind_test.cpp3
-rw-r--r--src/mongo/db/pipeline/parsed_aggregation_projection.h2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp5
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp6
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp1
-rw-r--r--src/mongo/db/pipeline/tee_buffer_test.cpp1
-rw-r--r--src/mongo/db/views/view_catalog.cpp1
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp1
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp2
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp1
-rw-r--r--src/mongo/s/commands/cluster_aggregate.h1
85 files changed, 2864 insertions, 1840 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index a78c307fad3..b5e5309d9e6 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -28,9 +28,12 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/pipeline/document_source.h"
+
+#include "mongo/db/matcher/expression_algo.h"
+#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/util/string_map.h"
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index d5d3c02f07b..81ba3fc61a6 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -30,50 +30,32 @@
#include "mongo/platform/basic.h"
+#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
-#include <deque>
#include <list>
+#include <memory>
#include <string>
-#include <utility>
#include <vector>
#include "mongo/base/init.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/client/connpool.h"
-#include "mongo/db/clientcursor.h"
+#include "mongo/client/dbclientinterface.h"
#include "mongo/db/collection_index_usage_tracker.h"
#include "mongo/db/jsobj.h"
-#include "mongo/db/matcher/matcher.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/accumulation_statement.h"
-#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/pipeline/granularity_rounder.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/db/pipeline/lookup_set_cache.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/value.h"
-#include "mongo/db/pipeline/value_comparator.h"
-#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/sorter/sorter.h"
#include "mongo/stdx/functional.h"
-#include "mongo/stdx/unordered_map.h"
#include "mongo/util/intrusive_counter.h"
namespace mongo {
class AggregationRequest;
class Document;
-class Expression;
-class ExpressionFieldPath;
-class ExpressionObject;
-class DocumentSourceLimit;
-class DocumentSourceSort;
-class PlanExecutor;
-class RecordCursor;
/**
* Registers a DocumentSource to have the name 'key'.
@@ -655,1775 +637,5 @@ protected:
std::shared_ptr<MongodInterface> _mongod;
};
-/**
- * Constructs and returns Documents from the BSONObj objects produced by a supplied
- * PlanExecutor.
- *
- * An object of this type may only be used by one thread, see SERVER-6123.
- */
-class DocumentSourceCursor final : public DocumentSource {
-public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- BSONObjSet getOutputSorts() final {
- return _outputSorts;
- }
- /**
- * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- Value serialize(bool explain = false) const final;
- bool isValidInitialSource() const final {
- return true;
- }
- void dispose() final;
-
- void detachFromOperationContext() final;
-
- void reattachToOperationContext(OperationContext* opCtx) final;
-
- /**
- * Create a document source based on a passed-in PlanExecutor.
- *
- * This is usually put at the beginning of a chain of document sources
- * in order to fetch data from the database.
- */
- static boost::intrusive_ptr<DocumentSourceCursor> create(
- const std::string& ns,
- std::unique_ptr<PlanExecutor> exec,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /*
- Record the query that was specified for the cursor this wraps, if
- any.
-
- This should be captured after any optimizations are applied to
- the pipeline so that it reflects what is really used.
-
- This gets used for explain output.
-
- @param pBsonObj the query to record
- */
- void setQuery(const BSONObj& query) {
- _query = query;
- }
-
- /*
- Record the sort that was specified for the cursor this wraps, if
- any.
-
- This should be captured after any optimizations are applied to
- the pipeline so that it reflects what is really used.
-
- This gets used for explain output.
-
- @param pBsonObj the sort to record
- */
- void setSort(const BSONObj& sort) {
- _sort = sort;
- }
-
- /**
- * Informs this object of projection and dependency information.
- *
- * @param projection The projection that has been passed down to the query system.
- * @param deps The output of DepsTracker::toParsedDeps.
- */
- void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps);
-
- /// returns -1 for no limit
- long long getLimit() const;
-
- /**
- * If subsequent sources need no information from the cursor, the cursor can simply output empty
- * documents, avoiding the overhead of converting BSONObjs to Documents.
- */
- void shouldProduceEmptyDocs() {
- _shouldProduceEmptyDocs = true;
- }
-
- const std::string& getPlanSummaryStr() const;
-
- const PlanSummaryStats& getPlanSummaryStats() const;
-
-protected:
- void doInjectExpressionContext() final;
-
-private:
- DocumentSourceCursor(const std::string& ns,
- std::unique_ptr<PlanExecutor> exec,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- void loadBatch();
-
- void recordPlanSummaryStr();
-
- void recordPlanSummaryStats();
-
- std::deque<Document> _currentBatch;
-
- // BSONObj members must outlive _projection and cursor.
- BSONObj _query;
- BSONObj _sort;
- BSONObj _projection;
- bool _shouldProduceEmptyDocs = false;
- boost::optional<ParsedDeps> _dependencies;
- boost::intrusive_ptr<DocumentSourceLimit> _limit;
- long long _docsAddedToBatches; // for _limit enforcement
-
- const std::string _ns;
- std::unique_ptr<PlanExecutor> _exec;
- BSONObjSet _outputSorts;
- std::string _planSummary;
- PlanSummaryStats _planSummaryStats;
-};
-
-
-class DocumentSourceGroup final : public DocumentSource, public SplittableDocumentSource {
-public:
- using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>;
- using GroupsMap = ValueUnorderedMap<Accumulators>;
-
- static const size_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
-
- // Virtuals from DocumentSource.
- boost::intrusive_ptr<DocumentSource> optimize() final;
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
- Value serialize(bool explain = false) const final;
- GetNextResult getNext() final;
- void dispose() final;
- const char* getSourceName() const final;
- BSONObjSet getOutputSorts() final;
-
- /**
- * Convenience method for creating a new $group stage.
- */
- static boost::intrusive_ptr<DocumentSourceGroup> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::intrusive_ptr<Expression>& groupByExpression,
- std::vector<AccumulationStatement> accumulationStatements,
- Variables::Id numVariables,
- size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
-
- /**
- * Parses 'elem' into a $group stage, or throws a UserException if 'elem' was an invalid
- * specification.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /**
- * Add an accumulator, which will become a field in each Document that results from grouping.
- */
- void addAccumulator(AccumulationStatement accumulationStatement);
-
- /**
- * Sets the expression to use to determine the group id of each document.
- */
- void setIdExpression(const boost::intrusive_ptr<Expression> idExpression);
-
- /**
- * Tell this source if it is doing a merge from shards. Defaults to false.
- */
- void setDoingMerge(bool doingMerge) {
- _doingMerge = doingMerge;
- }
-
- bool isStreaming() const {
- return _streaming;
- }
-
- // Virtuals for SplittableDocumentSource.
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- boost::intrusive_ptr<DocumentSource> getMergeSource() final;
-
-protected:
- void doInjectExpressionContext() final;
-
-private:
- explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
-
- /**
- * getNext() dispatches to one of these three depending on what type of $group it is. All three
- * of these methods expect '_currentAccumulators' to have been reset before being called, and
- * also expect initialize() to have been called already.
- */
- GetNextResult getNextStreaming();
- GetNextResult getNextSpilled();
- GetNextResult getNextStandard();
-
- /**
- * Attempt to identify an input sort order that allows us to turn into a streaming $group. If we
- * find one, return it. Otherwise, return boost::none.
- */
- boost::optional<BSONObj> findRelevantInputSort() const;
-
- /**
- * Before returning anything, this source must prepare itself. In a streaming $group,
- * initialize() requests the first document from the previous source, and uses it to prepare the
- * accumulators. In an unsorted $group, initialize() exhausts the previous source before
- * returning. The '_initialized' boolean indicates that initialize() has finished.
- *
- * This method may not be able to finish initialization in a single call if 'pSource' returns a
- * DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult
- * encountered, which may be either kEOF or kPauseExecution.
- */
- GetNextResult initialize();
-
- /**
- * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group
- * does not exhaust the previous stage before returning, and thus does not maintain as large a
- * store of documents at any one time, only an unsorted group can spill to disk.
- */
- std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
-
- Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
-
- /**
- * Computes the internal representation of the group key.
- */
- Value computeId(Variables* vars);
-
- /**
- * Converts the internal representation of the group key to the _id shape specified by the
- * user.
- */
- Value expandId(const Value& val);
-
- /**
- * 'vFieldName' contains the field names for the result documents, 'vpAccumulatorFactory'
- * contains the accumulator factories for the result documents, and 'vpExpression' contains the
- * common expressions used by each instance of each accumulator in order to find the right-hand
- * side of what gets added to the accumulator. These three vectors parallel each other.
- */
- std::vector<std::string> vFieldName;
- std::vector<Accumulator::Factory> vpAccumulatorFactory;
- std::vector<boost::intrusive_ptr<Expression>> vpExpression;
-
- bool _doingMerge;
- size_t _memoryUsageBytes = 0;
- size_t _maxMemoryUsageBytes;
- std::unique_ptr<Variables> _variables;
- std::vector<std::string> _idFieldNames; // used when id is a document
- std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
-
- BSONObj _inputSort;
- bool _streaming;
- bool _initialized;
-
- Value _currentId;
- Accumulators _currentAccumulators;
-
- // We use boost::optional to defer initialization until the ExpressionContext containing the
- // correct comparator is injected, since the groups must be built using the comparator's
- // definition of equality.
- boost::optional<GroupsMap> _groups;
-
- std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles;
- bool _spilled;
-
- // Only used when '_spilled' is false.
- GroupsMap::iterator groupsIterator;
-
- // Only used when '_spilled' is true.
- std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
- const bool _extSortAllowed;
-
- std::pair<Value, Value> _firstPartOfNextGroup;
- // Only used when '_sorted' is true.
- boost::optional<Document> _firstDocOfNextGroup;
-};
-
-/**
- * Provides a document source interface to retrieve index statistics for a given namespace.
- * Each document returned represents a single index and mongod instance.
- */
-class DocumentSourceIndexStats final : public DocumentSourceNeedsMongod {
-public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(bool explain = false) const final;
-
- virtual bool isValidInitialSource() const final {
- return true;
- }
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- CollectionIndexUsageMap _indexStatsMap;
- CollectionIndexUsageMap::const_iterator _indexStatsIter;
- std::string _processName;
-};
-
-class DocumentSourceMatch final : public DocumentSource {
-public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(bool explain = false) const final;
- boost::intrusive_ptr<DocumentSource> optimize() final;
- BSONObjSet getOutputSorts() final {
- return pSource ? pSource->getOutputSorts()
- : SimpleBSONObjComparator::kInstance.makeBSONObjSet();
- }
-
- /**
- * Attempts to combine with any subsequent $match stages, joining the query objects with a
- * $and.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- void setSource(DocumentSource* Source) final;
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
-
- /**
- * Convenience method for creating a $match stage.
- */
- static boost::intrusive_ptr<DocumentSourceMatch> create(
- BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
- /**
- * Parses a $match stage from 'elem'.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx);
-
- /**
- * Access the MatchExpression stored inside the DocumentSourceMatch. Does not release ownership.
- */
- MatchExpression* getMatchExpression() const {
- return _expression.get();
- }
-
- /**
- * Combines the filter in this $match with the filter of 'other' using a $and, updating this
- * match in place.
- */
- void joinMatchWith(boost::intrusive_ptr<DocumentSourceMatch> other);
-
- /**
- * Returns the query in MatchExpression syntax.
- */
- BSONObj getQuery() const;
-
- /** Returns the portion of the match that can safely be promoted to before a $redact.
- * If this returns an empty BSONObj, no part of this match may safely be promoted.
- *
- * To be safe to promote, removing a field from a document to be matched must not cause
- * that document to be accepted when it would otherwise be rejected. As an example,
- * {name: {$ne: "bob smith"}} accepts documents without a name field, which means that
- * running this filter before a redact that would remove the name field would leak
- * information. On the other hand, {age: {$gt:5}} is ok because it doesn't accept documents
- * that have had their age field removed.
- */
- BSONObj redactSafePortion() const;
-
- static bool isTextQuery(const BSONObj& query);
- bool isTextQuery() const {
- return _isTextQuery;
- }
-
- /**
- * Attempt to split this $match into two stages, where the first is not dependent upon any path
- * from 'fields', and where applying them in sequence is equivalent to applying this stage once.
- *
- * Will return two intrusive_ptrs to new $match stages, where the first pointer is independent
- * of 'fields', and the second is dependent. Either pointer may be null, so be sure to check the
- * return value.
- *
- * For example, {$match: {a: "foo", "b.c": 4}} split by "b" will return pointers to two stages:
- * {$match: {a: "foo"}}, and {$match: {"b.c": 4}}.
- */
- std::pair<boost::intrusive_ptr<DocumentSourceMatch>, boost::intrusive_ptr<DocumentSourceMatch>>
- splitSourceBy(const std::set<std::string>& fields);
-
- /**
- * Given a document 'input', extract 'fields' and produce a BSONObj with those values.
- */
- static BSONObj getObjectForMatch(const Document& input, const std::set<std::string>& fields);
-
- /**
- * Should be called _only_ on a MatchExpression that is a predicate on 'path', or subfields of
- * 'path'. It is also invalid to call this method on a $match including a $elemMatch on 'path',
- * for example: {$match: {'path': {$elemMatch: {'subfield': 3}}}}
- *
- * Returns a new DocumentSourceMatch that, if executed on the subdocument at 'path', is
- * equivalent to 'expression'.
- *
- * For example, if the original expression is {$and: [{'a.b': {$gt: 0}}, {'a.d': {$eq: 3}}]},
- * the new $match will have the expression {$and: [{b: {$gt: 0}}, {d: {$eq: 3}}]} after
- * descending on the path 'a'.
- */
- static boost::intrusive_ptr<DocumentSourceMatch> descendMatchOnPath(
- MatchExpression* matchExpr,
- const std::string& path,
- boost::intrusive_ptr<ExpressionContext> expCtx);
-
- void doInjectExpressionContext();
-
-private:
- DocumentSourceMatch(const BSONObj& query,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- void addDependencies(DepsTracker* deps) const;
-
- std::unique_ptr<MatchExpression> _expression;
-
- // Cache the dependencies so that we know what fields we need to serialize to BSON for matching.
- DepsTracker _dependencies;
-
- BSONObj _predicate;
- bool _isTextQuery;
-};
-
-class DocumentSourceMergeCursors : public DocumentSource {
-public:
- struct CursorDescriptor {
- CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId)
- : connectionString(std::move(connectionString)),
- ns(std::move(ns)),
- cursorId(cursorId) {}
-
- ConnectionString connectionString;
- std::string ns;
- CursorId cursorId;
- };
-
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- void dispose() final;
- Value serialize(bool explain = false) const final;
- bool isValidInitialSource() const final {
- return true;
- }
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- static boost::intrusive_ptr<DocumentSource> create(
- std::vector<CursorDescriptor> cursorDescriptors,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /** Returns non-owning pointers to cursors managed by this stage.
- * Call this instead of getNext() if you want access to the raw streams.
- * This method should only be called at most once.
- */
- std::vector<DBClientCursor*> getCursors();
-
- /**
- * Returns the next object from the cursor, throwing an appropriate exception if the cursor
- * reported an error. This is a better form of DBClientCursor::nextSafe.
- */
- static Document nextSafeFrom(DBClientCursor* cursor);
-
-private:
- struct CursorAndConnection {
- CursorAndConnection(const CursorDescriptor& cursorDescriptor);
- ScopedDbConnection connection;
- DBClientCursor cursor;
- };
-
- // using list to enable removing arbitrary elements
- typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors;
-
- DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- // Converts _cursorDescriptors into active _cursors.
- void start();
-
- // This is the description of cursors to merge.
- const std::vector<CursorDescriptor> _cursorDescriptors;
-
- // These are the actual cursors we are merging. Created lazily.
- Cursors _cursors;
- Cursors::iterator _currentCursor;
-
- bool _unstarted;
-};
-
-/**
- * Used in testing to store documents without using the storage layer. Methods are not marked as
- * final in order to allow tests to intercept calls if needed.
- */
-class DocumentSourceMock : public DocumentSource {
-public:
- DocumentSourceMock(std::deque<GetNextResult> results);
- DocumentSourceMock(std::deque<GetNextResult> results,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- GetNextResult getNext() override;
- const char* getSourceName() const override;
- Value serialize(bool explain = false) const override;
- void dispose() override;
- bool isValidInitialSource() const override {
- return true;
- }
- BSONObjSet getOutputSorts() override {
- return sorts;
- }
-
- static boost::intrusive_ptr<DocumentSourceMock> create();
-
- static boost::intrusive_ptr<DocumentSourceMock> create(Document doc);
-
- static boost::intrusive_ptr<DocumentSourceMock> create(const GetNextResult& result);
- static boost::intrusive_ptr<DocumentSourceMock> create(std::deque<GetNextResult> results);
-
- static boost::intrusive_ptr<DocumentSourceMock> create(const char* json);
- static boost::intrusive_ptr<DocumentSourceMock> create(
- const std::initializer_list<const char*>& jsons);
-
- void reattachToOperationContext(OperationContext* opCtx) {
- isDetachedFromOpCtx = false;
- }
-
- void detachFromOperationContext() {
- isDetachedFromOpCtx = true;
- }
-
- boost::intrusive_ptr<DocumentSource> optimize() override {
- isOptimized = true;
- return this;
- }
-
- void doInjectExpressionContext() override {
- isExpCtxInjected = true;
- }
-
- // Return documents from front of queue.
- std::deque<GetNextResult> queue;
-
- bool isDisposed = false;
- bool isDetachedFromOpCtx = false;
- bool isOptimized = false;
- bool isExpCtxInjected = false;
-
- BSONObjSet sorts;
-};
-
-/**
- * This class is for DocumentSources that take in and return one document at a time, in a 1:1
- * transformation. It should only be used via an alias that passes the transformation logic through
- * a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be
- * created from BSON.
- */
-class DocumentSourceSingleDocumentTransformation final : public DocumentSource {
-public:
- /**
- * This class defines the minimal interface that every parser wishing to take advantage of
- * DocumentSourceSingleDocumentTransformation must implement.
- *
- * This interface ensures that DocumentSourceSingleDocumentTransformations are passed parsed
- * objects that can execute the transformation and provide additional features like
- * serialization and reporting and returning dependencies. The parser must also provide
- * implementations for optimizing and adding the expression context, even if those functions do
- * nothing.
- */
- class TransformerInterface {
- public:
- virtual ~TransformerInterface() = default;
- virtual Document applyTransformation(Document input) = 0;
- virtual void optimize() = 0;
- virtual Document serialize(bool explain) const = 0;
- virtual DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const = 0;
- virtual void injectExpressionContext(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx) = 0;
- virtual GetModPathsReturn getModifiedPaths() const = 0;
- };
-
- DocumentSourceSingleDocumentTransformation(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- std::unique_ptr<TransformerInterface> parsedTransform,
- std::string name);
-
- // virtuals from DocumentSource
- const char* getSourceName() const final;
- GetNextResult getNext() final;
- boost::intrusive_ptr<DocumentSource> optimize() final;
- void dispose() final;
- Value serialize(bool explain) const final;
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- void doInjectExpressionContext() final;
- DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
- GetModPathsReturn getModifiedPaths() const final;
-
- bool canSwapWithMatch() const final {
- return true;
- }
-
-private:
- // Stores transformation logic.
- std::unique_ptr<TransformerInterface> _parsedTransform;
-
- // Specific name of the transformation.
- std::string _name;
-};
-
-class DocumentSourceOut final : public DocumentSourceNeedsMongod, public SplittableDocumentSource {
-public:
- static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse(
- const AggregationRequest& request, const BSONElement& spec);
-
- // virtuals from DocumentSource
- ~DocumentSourceOut() final;
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(bool explain = false) const final;
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
- bool needsPrimaryShard() const final {
- return true;
- }
-
- // Virtuals for SplittableDocumentSource
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return NULL;
- }
- boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
- }
-
- const NamespaceString& getOutputNs() const {
- return _outputNs;
- }
-
- /**
- Create a document source for output and pass-through.
-
- This can be put anywhere in a pipeline and will store content as
- well as pass it on.
-
- @param pBsonElement the raw BSON specification for the source
- @param pExpCtx the expression context for the pipeline
- @returns the newly created document source
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceOut(const NamespaceString& outputNs,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /**
- * Sets '_tempNs' to a unique temporary namespace, makes sure the output collection isn't
- * sharded or capped, and saves the collection options and indexes of the target collection.
- * Then creates the temporary collection we will insert into by copying the collection options
- * and indexes from the target collection.
- *
- * Sets '_initialized' to true upon completion.
- */
- void initialize();
-
- /**
- * Inserts all of 'toInsert' into the temporary collection.
- */
- void spill(const std::vector<BSONObj>& toInsert);
-
- bool _initialized = false;
- bool _done = false;
-
- // Holds on to the original collection options and index specs so we can check they didn't
- // change during computation.
- BSONObj _originalOutOptions;
- std::list<BSONObj> _originalIndexes;
-
- NamespaceString _tempNs; // output goes here as it is being processed.
- const NamespaceString _outputNs; // output will go here after all data is processed.
-};
-
-class DocumentSourceRedact final : public DocumentSource {
-public:
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- boost::intrusive_ptr<DocumentSource> optimize() final;
-
- /**
- * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
- * stage.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
-
- void doInjectExpressionContext() final;
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- Value serialize(bool explain = false) const final;
-
-private:
- DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::intrusive_ptr<Expression>& previsit);
-
- // These both work over _variables
- boost::optional<Document> redactObject(); // redacts CURRENT
- Value redactValue(const Value& in);
-
- Variables::Id _currentId;
- std::unique_ptr<Variables> _variables;
- boost::intrusive_ptr<Expression> _expression;
-};
-
-/*
- * $replaceRoot takes an object containing only an expression in the newRoot field, and replaces
- * each incoming document with the result of evaluating that expression. Throws an error if the
- * given expression is not an object or if the expression evaluates to the "missing" Value. This
- * is implemented as an extension of DocumentSourceSingleDocumentTransformation.
- */
-class DocumentSourceReplaceRoot final {
-public:
- /**
- * Creates a new replaceRoot DocumentSource from the BSON specification of the $replaceRoot
- * stage.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceReplaceRoot() = default;
-};
-
-class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource {
-public:
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(bool explain = false) const final;
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final {
- return SEE_NEXT;
- }
-
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- boost::intrusive_ptr<DocumentSource> getMergeSource() final;
-
- long long getSampleSize() const {
- return _size;
- }
-
- void doInjectExpressionContext() final;
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
-private:
- explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- long long _size;
-
- // Uses a $sort stage to randomly sort the documents.
- boost::intrusive_ptr<DocumentSourceSort> _sortStage;
-};
-
-/**
- * This class is not a registered stage, it is only used as an optimized replacement for $sample
- * when the storage engine allows us to use a random cursor.
- */
-class DocumentSourceSampleFromRandomCursor final : public DocumentSource {
-public:
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(bool explain = false) const final;
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
-
- void doInjectExpressionContext() final;
-
- static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- long long size,
- std::string idField,
- long long collectionSize);
-
-private:
- DocumentSourceSampleFromRandomCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- long long size,
- std::string idField,
- long long collectionSize);
-
- /**
- * Keep asking for documents from the random cursor until it yields a new document. Errors if a
- * a document is encountered without a value for '_idField', or if the random cursor keeps
- * returning duplicate elements.
- */
- GetNextResult getNextNonDuplicateDocument();
-
- long long _size;
-
- // The field to use as the id of a document. Usually '_id', but 'ts' for the oplog.
- std::string _idField;
-
- // Keeps track of the documents that have been returned, since a random cursor is allowed to
- // return duplicates. We use boost::optional to defer initialization until the ExpressionContext
- // containing the correct comparator is injected.
- boost::optional<ValueUnorderedSet> _seenDocs;
-
- // The approximate number of documents in the collection (includes orphans).
- const long long _nDocsInColl;
-
- // The value to be assigned to the randMetaField of outcoming documents. Each call to getNext()
- // will decrement this value by an amount scaled by _nDocsInColl as an attempt to appear as if
- // the documents were produced by a top-k random sort.
- double _randMetaFieldVal = 1.0;
-};
-
-class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource {
-public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- BSONObjSet getOutputSorts() final {
- return pSource ? pSource->getOutputSorts()
- : SimpleBSONObjComparator::kInstance.makeBSONObjSet();
- }
-
- /**
- * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- Value serialize(bool explain = false) const final;
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final {
- return SEE_NEXT; // This doesn't affect needed fields
- }
-
- /**
- Create a new limiting DocumentSource.
-
- @param pExpCtx the expression context for the pipeline
- @returns the DocumentSource
- */
- static boost::intrusive_ptr<DocumentSourceLimit> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
-
- // Virtuals for SplittableDocumentSource
- // Need to run on rounter. Running on shard as well is an optimization.
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return this;
- }
- boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
- }
-
- long long getLimit() const {
- return _limit;
- }
- void setLimit(long long newLimit) {
- _limit = newLimit;
- }
-
- /**
- Create a limiting DocumentSource from BSON.
-
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $limit.
-
- @param pBsonElement the BSONELement that defines the limit
- @param pExpCtx the expression context
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
-
- long long _limit;
- long long _nReturned = 0;
-};
-
-class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource {
-public:
- static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024;
-
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
-
- GetModPathsReturn getModifiedPaths() const final {
- // A $sort does not modify any paths.
- return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}};
- }
-
- bool canSwapWithMatch() const final {
- return true;
- }
-
- BSONObjSet getOutputSorts() final {
- return allPrefixes(_sort);
- }
-
- /**
- * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- void dispose() final;
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
-
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- boost::intrusive_ptr<DocumentSource> getMergeSource() final;
-
- /// Write out a Document whose contents are the sort key.
- Document serializeSortKey(bool explain) const;
-
- /**
- * Parses a $sort stage from the user-supplied BSON.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /**
- * Convenience method for creating a $sort stage.
- */
- static boost::intrusive_ptr<DocumentSourceSort> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- BSONObj sortOrder,
- long long limit = -1,
- uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
-
- /**
- * Returns -1 for no limit.
- */
- long long getLimit() const;
-
- /**
- * Loads a document to be sorted. This can be used to sort a stream of documents that are not
- * coming from another DocumentSource. Once all documents have been added, the caller must call
- * loadingDone() before using getNext() to receive the documents in sorted order.
- */
- void loadDocument(const Document& doc);
-
- /**
- * Signals to the sort stage that there will be no more input documents. It is an error to call
- * loadDocument() once this method returns.
- */
- void loadingDone();
-
- /**
- * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that
- * have already been sorted.
- */
- void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
-
- bool isPopulated() {
- return _populated;
- };
-
- boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const {
- return limitSrc;
- }
-
-private:
- explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- Value serialize(bool explain = false) const final {
- MONGO_UNREACHABLE; // Should call serializeToArray instead.
- }
-
- /**
- * Helper to add a sort key to this stage.
- */
- void addKey(StringData fieldPath, bool ascending);
-
- /**
- * Before returning anything, we have to consume all input and sort it. This method consumes all
- * input and prepares the sorted stream '_output'.
- *
- * This method may not be able to finish populating the sorter in a single call if 'pSource'
- * returns a DocumentSource::GetNextResult::kPauseExecution, so it returns the last
- * GetNextResult encountered, which may be either kEOF or kPauseExecution.
- */
- GetNextResult populate();
- bool _populated = false;
-
- BSONObj _sort;
-
- SortOptions makeSortOptions() const;
-
- // This is used to merge pre-sorted results from a DocumentSourceMergeCursors.
- class IteratorFromCursor;
-
- /* these two parallel each other */
- typedef std::vector<boost::intrusive_ptr<Expression>> SortKey;
- SortKey vSortKey;
- std::vector<char> vAscending; // used like std::vector<bool> but without specialization
-
- /// Extracts the fields in vSortKey from the Document;
- Value extractKey(const Document& d) const;
-
- /// Compare two Values according to the specified sort key.
- int compare(const Value& lhs, const Value& rhs) const;
-
- typedef Sorter<Value, Document> MySorter;
-
- /**
- * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep
- * the smallest limit.
- */
- void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) {
- if (!limitSrc || limit->getLimit() < limitSrc->getLimit()) {
- limitSrc = limit;
- }
- }
-
- // For MySorter
- class Comparator {
- public:
- explicit Comparator(const DocumentSourceSort& source) : _source(source) {}
- int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
- return _source.compare(lhs.first, rhs.first);
- }
-
- private:
- const DocumentSourceSort& _source;
- };
-
- boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
-
- uint64_t _maxMemoryUsageBytes;
- bool _done;
- bool _mergingPresorted;
- std::unique_ptr<MySorter> _sorter;
- std::unique_ptr<MySorter::Iterator> _output;
-};
-
-class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource {
-public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- /**
- * Attempts to move a subsequent $limit before the skip, potentially allowing for forther
- * optimizations earlier in the pipeline.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- Value serialize(bool explain = false) const final;
- boost::intrusive_ptr<DocumentSource> optimize() final;
- BSONObjSet getOutputSorts() final {
- return pSource ? pSource->getOutputSorts()
- : SimpleBSONObjComparator::kInstance.makeBSONObjSet();
- }
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final {
- return SEE_NEXT; // This doesn't affect needed fields
- }
-
- // Virtuals for SplittableDocumentSource
- // Need to run on rounter. Can't run on shards.
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return NULL;
- }
- boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
- }
-
- long long getSkip() const {
- return _nToSkip;
- }
- void setSkip(long long newSkip) {
- _nToSkip = newSkip;
- }
-
- /**
- * Convenience method for creating a $skip stage.
- */
- static boost::intrusive_ptr<DocumentSourceSkip> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip);
-
- /**
- * Parses the user-supplied BSON into a $skip stage.
- *
- * Throws a UserException if 'elem' is an invalid $skip specification.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- long long nToSkip);
-
- long long _nToSkip = 0;
- long long _nSkippedSoFar = 0;
-};
-
-
-class DocumentSourceUnwind final : public DocumentSource {
-public:
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(bool explain = false) const final;
- BSONObjSet getOutputSorts() final;
-
- /**
- * Returns the unwound path, and the 'includeArrayIndex' path, if specified.
- */
- GetModPathsReturn getModifiedPaths() const final;
-
- bool canSwapWithMatch() const final {
- return true;
- }
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
-
- /**
- * Creates a new $unwind DocumentSource from a BSON specification.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- static boost::intrusive_ptr<DocumentSourceUnwind> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const std::string& path,
- bool includeNullIfEmptyOrMissing,
- const boost::optional<std::string>& includeArrayIndex);
-
- std::string getUnwindPath() const {
- return _unwindPath.fullPath();
- }
-
- bool preserveNullAndEmptyArrays() const {
- return _preserveNullAndEmptyArrays;
- }
-
- const boost::optional<FieldPath>& indexPath() const {
- return _indexPath;
- }
-
-private:
- DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- const FieldPath& fieldPath,
- bool includeNullIfEmptyOrMissing,
- const boost::optional<FieldPath>& includeArrayIndex);
-
- // Configuration state.
- const FieldPath _unwindPath;
- // Documents that have a nullish value, or an empty array for the field '_unwindPath', will pass
- // through the $unwind stage unmodified if '_preserveNullAndEmptyArrays' is true.
- const bool _preserveNullAndEmptyArrays;
- // If set, the $unwind stage will include the array index in the specified path, overwriting any
- // existing value, setting to null when the value was a non-array or empty array.
- const boost::optional<FieldPath> _indexPath;
-
- // Iteration state.
- class Unwinder;
- std::unique_ptr<Unwinder> _unwinder;
-};
-
-class DocumentSourceGeoNear : public DocumentSourceNeedsMongod, public SplittableDocumentSource {
-public:
- static const long long kDefaultLimit;
-
- // virtuals from DocumentSource
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- /**
- * Attempts to combine with a subsequent limit stage, setting the internal limit field
- * as a result.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- bool isValidInitialSource() const final {
- return true;
- }
- Value serialize(bool explain = false) const final;
- BSONObjSet getOutputSorts() final {
- return SimpleBSONObjComparator::kInstance.makeBSONObjSet(
- {BSON(distanceField->fullPath() << -1)});
- }
-
- // Virtuals for SplittableDocumentSource
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- boost::intrusive_ptr<DocumentSource> getMergeSource() final;
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx);
-
- static char geoNearName[];
-
- long long getLimit() {
- return limit;
- }
-
- BSONObj getQuery() const {
- return query;
- };
-
- // this should only be used for testing
- static boost::intrusive_ptr<DocumentSourceGeoNear> create(
- const boost::intrusive_ptr<ExpressionContext>& pCtx);
-
-private:
- explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- void parseOptions(BSONObj options);
- BSONObj buildGeoNearCmd() const;
- void runCommand();
-
- // These fields describe the command to run.
- // coords and distanceField are required, rest are optional
- BSONObj coords; // "near" option, but near is a reserved keyword on windows
- bool coordsIsArray;
- std::unique_ptr<FieldPath> distanceField; // Using unique_ptr because FieldPath can't be empty
- long long limit;
- double maxDistance;
- double minDistance;
- BSONObj query;
- bool spherical;
- double distanceMultiplier;
- std::unique_ptr<FieldPath> includeLocs;
-
- // these fields are used while processing the results
- BSONObj cmdOutput;
- std::unique_ptr<BSONObjIterator> resultsIterator; // iterator over cmdOutput["results"]
-};
-
-/**
- * Queries separate collection for equality matches with documents in the pipeline collection.
- * Adds matching documents to a new array field in the input document.
- */
-class DocumentSourceLookUp final : public DocumentSourceNeedsMongod,
- public SplittableDocumentSource {
-public:
- static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse(
- const AggregationRequest& request, const BSONElement& spec);
-
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
-
- /**
- * Returns the 'as' path, and possibly fields modified by an absorbed $unwind.
- */
- GetModPathsReturn getModifiedPaths() const final;
-
- bool canSwapWithMatch() const final {
- return true;
- }
-
- /**
- * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc'
- * field.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
- void dispose() final;
-
- BSONObjSet getOutputSorts() final {
- return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
- }
-
- bool needsPrimaryShard() const final {
- return true;
- }
-
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
-
- boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
- }
-
- void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
- collections->push_back(_fromNs);
- }
-
- void doDetachFromOperationContext() final;
-
- void doReattachToOperationContext(OperationContext* opCtx) final;
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- /**
- * Builds the BSONObj used to query the foreign collection and wraps it in a $match.
- */
- static BSONObj makeMatchStageFromInput(const Document& input,
- const FieldPath& localFieldName,
- const std::string& foreignFieldName,
- const BSONObj& additionalFilter);
-
- /**
- * Helper to absorb an $unwind stage. Only used for testing this special behavior.
- */
- void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) {
- invariant(!_handlingUnwind);
- _unwindSrc = unwind;
- _handlingUnwind = true;
- }
-
-protected:
- void doInjectExpressionContext() final;
-
-private:
- DocumentSourceLookUp(NamespaceString fromNs,
- std::string as,
- std::string localField,
- std::string foreignField,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
- Value serialize(bool explain = false) const final {
- // Should not be called; use serializeToArray instead.
- MONGO_UNREACHABLE;
- }
-
- GetNextResult unwindResult();
-
- NamespaceString _fromNs;
- FieldPath _as;
- FieldPath _localField;
- FieldPath _foreignField;
- std::string _foreignFieldFieldName;
- boost::optional<BSONObj> _additionalFilter;
-
- // The ExpressionContext used when performing aggregation pipelines against the '_fromNs'
- // namespace.
- boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
-
- // The aggregation pipeline to perform against the '_fromNs' namespace.
- std::vector<BSONObj> _fromPipeline;
-
- boost::intrusive_ptr<DocumentSourceMatch> _matchSrc;
- boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc;
-
- bool _handlingUnwind = false;
- bool _handlingMatch = false;
-
- // The following members are used to hold onto state across getNext() calls when
- // '_handlingUnwind' is true.
- long long _cursorIndex = 0;
- boost::intrusive_ptr<Pipeline> _pipeline;
- boost::optional<Document> _input;
- boost::optional<Document> _nextValue;
-};
-
-class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongod {
-public:
- static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse(
- const AggregationRequest& request, const BSONElement& spec);
-
- GetNextResult getNext() final;
- const char* getSourceName() const final;
- void dispose() final;
- BSONObjSet getOutputSorts() final;
- void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
-
- /**
- * Returns the 'as' path, and possibly the fields modified by an absorbed $unwind.
- */
- GetModPathsReturn getModifiedPaths() const final;
-
- bool canSwapWithMatch() const final {
- return true;
- }
-
- /**
- * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field.
- */
- Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
- Pipeline::SourceContainer* container) final;
-
- GetDepsReturn getDependencies(DepsTracker* deps) const final {
- _startWith->addDependencies(deps);
- return SEE_NEXT;
- };
-
- bool needsPrimaryShard() const final {
- return true;
- }
-
- void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
- collections->push_back(_from);
- }
-
- void doDetachFromOperationContext() final;
-
- void doReattachToOperationContext(OperationContext* opCtx) final;
-
- static boost::intrusive_ptr<DocumentSourceGraphLookUp> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- NamespaceString fromNs,
- std::string asField,
- std::string connectFromField,
- std::string connectToField,
- boost::intrusive_ptr<Expression> startWith,
- boost::optional<BSONObj> additionalFilter,
- boost::optional<FieldPath> depthField,
- boost::optional<long long> maxDepth,
- boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc);
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-protected:
- void doInjectExpressionContext() final;
-
-private:
- DocumentSourceGraphLookUp(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- NamespaceString from,
- std::string as,
- std::string connectFromField,
- std::string connectToField,
- boost::intrusive_ptr<Expression> startWith,
- boost::optional<BSONObj> additionalFilter,
- boost::optional<FieldPath> depthField,
- boost::optional<long long> maxDepth,
- boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc);
-
- Value serialize(bool explain = false) const final {
- // Should not be called; use serializeToArray instead.
- MONGO_UNREACHABLE;
- }
-
- /**
- * Prepares the query to execute on the 'from' collection wrapped in a $match by using the
- * contents of '_frontier'.
- *
- * Fills 'cached' with any values that were retrieved from the cache.
- *
- * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache.
- * Otherwise, returns a query object.
- */
- boost::optional<BSONObj> makeMatchStageFromFrontier(BSONObjSet* cached);
-
- /**
- * If we have internalized a $unwind, getNext() dispatches to this function.
- */
- GetNextResult getNextUnwound();
-
- /**
- * Perform a breadth-first search of the 'from' collection. '_frontier' should already be
- * populated with the values for the initial query. Populates '_discovered' with the result(s)
- * of the query.
- */
- void doBreadthFirstSearch();
-
- /**
- * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a
- * breadth-first search. Caller should check that _input is not boost::none.
- */
- void performSearch();
-
- /**
- * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying
- * for 'queried'.
- */
- void addToCache(const BSONObj& result, const ValueUnorderedSet& queried);
-
- /**
- * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then
- * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'.
- */
- void checkMemoryUsage();
-
- /**
- * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier'
- * with the object's 'connectTo' values.
- *
- * Returns whether '_visited' was updated, and thus, whether the search should recurse.
- */
- bool addToVisitedAndFrontier(BSONObj result, long long depth);
-
- // $graphLookup options.
- NamespaceString _from;
- FieldPath _as;
- FieldPath _connectFromField;
- FieldPath _connectToField;
- boost::intrusive_ptr<Expression> _startWith;
- boost::optional<BSONObj> _additionalFilter;
- boost::optional<FieldPath> _depthField;
- boost::optional<long long> _maxDepth;
-
- // The ExpressionContext used when performing aggregation pipelines against the '_from'
- // namespace.
- boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
-
- // The aggregation pipeline to perform against the '_from' namespace.
- std::vector<BSONObj> _fromPipeline;
-
- size_t _maxMemoryUsageBytes = 100 * 1024 * 1024;
-
- // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'.
- size_t _visitedUsageBytes = 0;
- size_t _frontierUsageBytes = 0;
-
- // Only used during the breadth-first search, tracks the set of values on the current frontier.
- // We use boost::optional to defer initialization until the ExpressionContext containing the
- // correct comparator is injected.
- boost::optional<ValueUnorderedSet> _frontier;
-
- // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the
- // document from the foreign collection, value is the document itself. The keys are compared
- // using the simple collation.
- ValueUnorderedMap<BSONObj> _visited;
-
- // Caches query results to avoid repeating any work. This structure is maintained across calls
- // to getNext().
- LookupSetCache _cache;
-
- // When we have internalized a $unwind, we must keep track of the input document, since we will
- // need it for multiple "getNext()" calls.
- boost::optional<Document> _input;
-
- // The variables that are in scope to be used by the '_startWith' expression.
- std::unique_ptr<Variables> _variables;
-
- // Keep track of a $unwind that was absorbed into this stage.
- boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind;
-
- // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that
- // field, tracking how many results we've returned so far for the current input document.
- long long _outputIndex;
-};
-
-class DocumentSourceSortByCount final {
-public:
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceSortByCount() = default;
-};
-
-class DocumentSourceCount final {
-public:
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceCount() = default;
-};
-
-class DocumentSourceBucket final {
-public:
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceBucket() = default;
-};
-
-/**
- * The $project stage can be used for simple transformations such as including or excluding a set
- * of fields, or can do more sophisticated things, like include some fields and add new "computed"
- * fields, using the expression language. Note you can not mix an exclusion-style projection with
- * adding or including any other fields.
- */
-class DocumentSourceProject final {
-public:
- /**
- * Convenience method to create a $project stage from 'projectSpec'.
- */
- static boost::intrusive_ptr<DocumentSource> create(
- BSONObj projectSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- /**
- * Parses a $project stage from the user-supplied BSON.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceProject() = default;
-};
-
-/**
- * $addFields adds or replaces the specified fields to/in the document while preserving the original
- * document. It is modeled on and throws the same errors as $project.
- */
-class DocumentSourceAddFields final {
-public:
- /**
- * Convenience method for creating a $addFields stage from 'addFieldsSpec'.
- */
- static boost::intrusive_ptr<DocumentSource> create(
- BSONObj addFieldsSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- /**
- * Parses a $addFields stage from the user-supplied BSON.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
-private:
- DocumentSourceAddFields() = default;
-};
-
-/**
- * Provides a document source interface to retrieve collection-level statistics for a given
- * collection.
- */
-class DocumentSourceCollStats : public DocumentSourceNeedsMongod {
-public:
- class LiteParsed final : public LiteParsedDocumentSource {
- public:
- static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
- const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>();
- }
-
- bool isCollStats() const final {
- return true;
- }
-
- stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
- return stdx::unordered_set<NamespaceString>();
- }
- };
-
- DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSourceNeedsMongod(pExpCtx) {}
-
- GetNextResult getNext() final;
-
- const char* getSourceName() const final;
-
- bool isValidInitialSource() const final;
-
- Value serialize(bool explain = false) const;
-
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- // The raw object given to $collStats containing user specified options.
- BSONObj _collStatsSpec;
- bool _finished = false;
-};
-
-/**
- * The $bucketAuto stage takes a user-specified number of buckets and automatically determines
- * boundaries such that the values are approximately equally distributed between those buckets.
- */
-class DocumentSourceBucketAuto final : public DocumentSource, public SplittableDocumentSource {
-public:
- Value serialize(bool explain = false) const final;
- GetDepsReturn getDependencies(DepsTracker* deps) const final;
- GetNextResult getNext() final;
- void dispose() final;
- const char* getSourceName() const final;
-
- /**
- * The $bucketAuto stage must be run on the merging shard.
- */
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
- boost::intrusive_ptr<DocumentSource> getMergeSource() final {
- return this;
- }
-
- static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
-
- /**
- * Convenience method to create a $bucketAuto stage.
- *
- * If 'accumulationStatements' is the empty vector, it will be filled in with the statement
- * 'count: {$sum: 1}'.
- */
- static boost::intrusive_ptr<DocumentSourceBucketAuto> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::intrusive_ptr<Expression>& groupByExpression,
- Variables::Id numVariables,
- int numBuckets,
- std::vector<AccumulationStatement> accumulationStatements = {},
- const boost::intrusive_ptr<GranularityRounder>& granularityRounder = nullptr,
- uint64_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
-
- /**
- * Parses a $bucketAuto stage from the user-supplied BSON.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
-private:
- DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- const boost::intrusive_ptr<Expression>& groupByExpression,
- Variables::Id numVariables,
- int numBuckets,
- std::vector<AccumulationStatement> accumulationStatements,
- const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
- uint64_t maxMemoryUsageBytes);
-
- // struct for holding information about a bucket.
- struct Bucket {
- Bucket(Value min, Value max, std::vector<Accumulator::Factory> accumulatorFactories);
- Value _min;
- Value _max;
- std::vector<boost::intrusive_ptr<Accumulator>> _accums;
- };
-
- /**
- * Consumes all of the documents from the source in the pipeline and sorts them by their
- * 'groupBy' value. This method might not be able to finish populating the sorter in a single
- * call if 'pSource' returns a DocumentSource::GetNextResult::kPauseExecution, so this returns
- * the last GetNextResult encountered, which may be either kEOF or kPauseExecution.
- */
- GetNextResult populateSorter();
-
- /**
- * Computes the 'groupBy' expression value for 'doc'.
- */
- Value extractKey(const Document& doc);
-
- /**
- * Calculates the bucket boundaries for the input documents and places them into buckets.
- */
- void populateBuckets();
-
- /**
- * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'.
- */
- void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket);
-
- /**
- * Adds 'newBucket' to _buckets and updates any boundaries if necessary.
- */
- void addBucket(Bucket& newBucket);
-
- /**
- * Makes a document using the information from bucket. This is what is returned when getNext()
- * is called.
- */
- Document makeDocument(const Bucket& bucket);
-
- std::unique_ptr<Sorter<Value, Document>> _sorter;
- std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput;
-
- // _fieldNames contains the field names for the result documents, _accumulatorFactories contains
- // the accumulator factories for the result documents, and _expressions contains the common
- // expressions used by each instance of each accumulator in order to find the right-hand side of
- // what gets added to the accumulator. These three vectors parallel each other.
- std::vector<std::string> _fieldNames;
- std::vector<Accumulator::Factory> _accumulatorFactories;
- std::vector<boost::intrusive_ptr<Expression>> _expressions;
-
- int _nBuckets;
- uint64_t _maxMemoryUsageBytes;
- bool _populated = false;
- std::vector<Bucket> _buckets;
- std::vector<Bucket>::iterator _bucketsIterator;
- std::unique_ptr<Variables> _variables;
- boost::intrusive_ptr<Expression> _groupByExpression;
- boost::intrusive_ptr<GranularityRounder> _granularityRounder;
- long long _nDocuments = 0;
-};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp
index 7f9c2383b58..0f7366538f9 100644
--- a/src/mongo/db/pipeline/document_source_add_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_add_fields.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_add_fields.h"
#include <boost/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
diff --git a/src/mongo/db/pipeline/document_source_add_fields.h b/src/mongo/db/pipeline/document_source_add_fields.h
new file mode 100644
index 00000000000..b1ce0415ab3
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_add_fields.h
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+
+namespace mongo {
+
+/**
+ * $addFields adds or replaces the specified fields to/in the document while preserving the original
+ * document. It is modeled on and throws the same errors as $project.
+ */
+class DocumentSourceAddFields final {
+public:
+ /**
+ * Convenience method for creating a $addFields stage from 'addFieldsSpec'.
+ */
+ static boost::intrusive_ptr<DocumentSource> create(
+ BSONObj addFieldsSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
+ * Parses a $addFields stage from the user-supplied BSON.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+private:
+ DocumentSourceAddFields() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_add_fields_test.cpp b/src/mongo/db/pipeline/document_source_add_fields_test.cpp
index 724c896e4c0..53ff48ae906 100644
--- a/src/mongo/db/pipeline/document_source_add_fields_test.cpp
+++ b/src/mongo/db/pipeline/document_source_add_fields_test.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_add_fields.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
diff --git a/src/mongo/db/pipeline/document_source_bucket.cpp b/src/mongo/db/pipeline/document_source_bucket.cpp
index 484f4b9a62f..05152b7e313 100644
--- a/src/mongo/db/pipeline/document_source_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket.cpp
@@ -26,8 +26,11 @@
* it in the license file.
*/
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_bucket.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/document_source_bucket.h b/src/mongo/db/pipeline/document_source_bucket.h
new file mode 100644
index 00000000000..cd7fe31b14e
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_bucket.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * The $bucket stage is an alias for a $group stage followed by a $sort stage.
+ */
+class DocumentSourceBucket final {
+public:
+ /**
+ * Returns a $group stage followed by a $sort stage.
+ */
+ static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceBucket() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index 31374548175..c9a09354434 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_bucket_auto.h"
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
new file mode 100644
index 00000000000..9279bcd52b9
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/granularity_rounder.h"
+#include "mongo/db/sorter/sorter.h"
+
+namespace mongo {
+
+/**
+ * The $bucketAuto stage takes a user-specified number of buckets and automatically determines
+ * boundaries such that the values are approximately equally distributed between those buckets.
+ */
+class DocumentSourceBucketAuto final : public DocumentSource, public SplittableDocumentSource {
+public:
+ Value serialize(bool explain = false) const final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ GetNextResult getNext() final;
+ void dispose() final;
+ const char* getSourceName() const final;
+
+ /**
+ * The $bucketAuto stage must be run on the merging shard.
+ */
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+
+ /**
+ * Convenience method to create a $bucketAuto stage.
+ *
+ * If 'accumulationStatements' is the empty vector, it will be filled in with the statement
+ * 'count: {$sum: 1}'.
+ */
+ static boost::intrusive_ptr<DocumentSourceBucketAuto> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ Variables::Id numVariables,
+ int numBuckets,
+ std::vector<AccumulationStatement> accumulationStatements = {},
+ const boost::intrusive_ptr<GranularityRounder>& granularityRounder = nullptr,
+ uint64_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
+
+ /**
+ * Parses a $bucketAuto stage from the user-supplied BSON.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ Variables::Id numVariables,
+ int numBuckets,
+ std::vector<AccumulationStatement> accumulationStatements,
+ const boost::intrusive_ptr<GranularityRounder>& granularityRounder,
+ uint64_t maxMemoryUsageBytes);
+
+ // struct for holding information about a bucket.
+ struct Bucket {
+ Bucket(Value min, Value max, std::vector<Accumulator::Factory> accumulatorFactories);
+ Value _min;
+ Value _max;
+ std::vector<boost::intrusive_ptr<Accumulator>> _accums;
+ };
+
+ /**
+ * Consumes all of the documents from the source in the pipeline and sorts them by their
+ * 'groupBy' value. This method might not be able to finish populating the sorter in a single
+ * call if 'pSource' returns a DocumentSource::GetNextResult::kPauseExecution, so this returns
+ * the last GetNextResult encountered, which may be either kEOF or kPauseExecution.
+ */
+ GetNextResult populateSorter();
+
+ /**
+ * Computes the 'groupBy' expression value for 'doc'.
+ */
+ Value extractKey(const Document& doc);
+
+ /**
+ * Calculates the bucket boundaries for the input documents and places them into buckets.
+ */
+ void populateBuckets();
+
+ /**
+ * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'.
+ */
+ void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket);
+
+ /**
+ * Adds 'newBucket' to _buckets and updates any boundaries if necessary.
+ */
+ void addBucket(Bucket& newBucket);
+
+ /**
+ * Makes a document using the information from bucket. This is what is returned when getNext()
+ * is called.
+ */
+ Document makeDocument(const Bucket& bucket);
+
+ std::unique_ptr<Sorter<Value, Document>> _sorter;
+ std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput;
+
+ // _fieldNames contains the field names for the result documents, _accumulatorFactories contains
+ // the accumulator factories for the result documents, and _expressions contains the common
+ // expressions used by each instance of each accumulator in order to find the right-hand side of
+ // what gets added to the accumulator. These three vectors parallel each other.
+ std::vector<std::string> _fieldNames;
+ std::vector<Accumulator::Factory> _accumulatorFactories;
+ std::vector<boost::intrusive_ptr<Expression>> _expressions;
+
+ int _nBuckets;
+ uint64_t _maxMemoryUsageBytes;
+ bool _populated = false;
+ std::vector<Bucket> _buckets;
+ std::vector<Bucket>::iterator _bucketsIterator;
+ std::unique_ptr<Variables> _variables;
+ boost::intrusive_ptr<Expression> _groupByExpression;
+ boost::intrusive_ptr<GranularityRounder> _granularityRounder;
+ long long _nDocuments = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index 484826aaea6..b3678dbce40 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -40,7 +40,9 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_bucket_auto.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/unittest/temp_dir.h"
diff --git a/src/mongo/db/pipeline/document_source_bucket_test.cpp b/src/mongo/db/pipeline/document_source_bucket_test.cpp
index 4f6f3c7f9c3..b93916f4347 100644
--- a/src/mongo/db/pipeline/document_source_bucket_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_test.cpp
@@ -35,7 +35,10 @@
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_bucket.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/pipeline/value_comparator.h"
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 4c81626942b..30ebb4916fc 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_coll_stats.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
new file mode 100644
index 00000000000..d8cf2eb6718
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * Provides a document source interface to retrieve collection-level statistics for a given
+ * collection.
+ */
+class DocumentSourceCollStats : public DocumentSourceNeedsMongod {
+public:
+ class LiteParsed final : public LiteParsedDocumentSource {
+ public:
+ static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
+ const BSONElement& spec) {
+ return stdx::make_unique<LiteParsed>();
+ }
+
+ bool isCollStats() const final {
+ return true;
+ }
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ return stdx::unordered_set<NamespaceString>();
+ }
+ };
+
+ DocumentSourceCollStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSourceNeedsMongod(pExpCtx) {}
+
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final;
+
+ bool isValidInitialSource() const final;
+
+ Value serialize(bool explain = false) const;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ // The raw object given to $collStats containing user specified options.
+ BSONObj _collStatsSpec;
+ bool _finished = false;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_count.cpp b/src/mongo/db/pipeline/document_source_count.cpp
index 269126b219b..01c4044fece 100644
--- a/src/mongo/db/pipeline/document_source_count.cpp
+++ b/src/mongo/db/pipeline/document_source_count.cpp
@@ -28,9 +28,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_count.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_count.h b/src/mongo/db/pipeline/document_source_count.h
new file mode 100644
index 00000000000..e1817fbfca3
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_count.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * The $count stage is an alias for a $group stage followed by a $project stage.
+ */
+class DocumentSourceCount final {
+public:
+ /**
+ * Returns a $group stage followed by a $project stage.
+ */
+ static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceCount() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_count_test.cpp b/src/mongo/db/pipeline/document_source_count_test.cpp
index 552e69150b1..5535ee5ea83 100644
--- a/src/mongo/db/pipeline/document_source_count_test.cpp
+++ b/src/mongo/db/pipeline/document_source_count_test.cpp
@@ -37,7 +37,9 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_count.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/value.h"
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index e22e72c1ede..0ee8332c9bc 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/db_raii.h"
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
new file mode 100644
index 00000000000..4d2cb1a81df
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/query/plan_summary_stats.h"
+
+namespace mongo {
+
+class PlanExecutor;
+
+/**
+ * Constructs and returns Documents from the BSONObj objects produced by a supplied
+ * PlanExecutor.
+ *
+ * An object of this type may only be used by one thread, see SERVER-6123.
+ */
+class DocumentSourceCursor final : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ BSONObjSet getOutputSorts() final {
+ return _outputSorts;
+ }
+ /**
+ * Attempts to combine with any subsequent $limit stages by setting the internal '_limit' field.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ Value serialize(bool explain = false) const final;
+ bool isValidInitialSource() const final {
+ return true;
+ }
+ void dispose() final;
+
+ void detachFromOperationContext() final;
+
+ void reattachToOperationContext(OperationContext* opCtx) final;
+
+ /**
+ * Create a document source based on a passed-in PlanExecutor.
+ *
+ * This is usually put at the beginning of a chain of document sources
+ * in order to fetch data from the database.
+ */
+ static boost::intrusive_ptr<DocumentSourceCursor> create(
+ const std::string& ns,
+ std::unique_ptr<PlanExecutor> exec,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /*
+ Record the query that was specified for the cursor this wraps, if
+ any.
+
+ This should be captured after any optimizations are applied to
+ the pipeline so that it reflects what is really used.
+
+ This gets used for explain output.
+
+ @param pBsonObj the query to record
+ */
+ void setQuery(const BSONObj& query) {
+ _query = query;
+ }
+
+ /*
+ Record the sort that was specified for the cursor this wraps, if
+ any.
+
+ This should be captured after any optimizations are applied to
+ the pipeline so that it reflects what is really used.
+
+ This gets used for explain output.
+
+ @param pBsonObj the sort to record
+ */
+ void setSort(const BSONObj& sort) {
+ _sort = sort;
+ }
+
+ /**
+ * Informs this object of projection and dependency information.
+ *
+ * @param projection The projection that has been passed down to the query system.
+ * @param deps The output of DepsTracker::toParsedDeps.
+ */
+ void setProjection(const BSONObj& projection, const boost::optional<ParsedDeps>& deps);
+
+ /// returns -1 for no limit
+ long long getLimit() const;
+
+ /**
+ * If subsequent sources need no information from the cursor, the cursor can simply output empty
+ * documents, avoiding the overhead of converting BSONObjs to Documents.
+ */
+ void shouldProduceEmptyDocs() {
+ _shouldProduceEmptyDocs = true;
+ }
+
+ const std::string& getPlanSummaryStr() const;
+
+ const PlanSummaryStats& getPlanSummaryStats() const;
+
+protected:
+ void doInjectExpressionContext() final;
+
+private:
+ DocumentSourceCursor(const std::string& ns,
+ std::unique_ptr<PlanExecutor> exec,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ void loadBatch();
+
+ void recordPlanSummaryStr();
+
+ void recordPlanSummaryStats();
+
+ std::deque<Document> _currentBatch;
+
+ // BSONObj members must outlive _projection and cursor.
+ BSONObj _query;
+ BSONObj _sort;
+ BSONObj _projection;
+ bool _shouldProduceEmptyDocs = false;
+ boost::optional<ParsedDeps> _dependencies;
+ boost::intrusive_ptr<DocumentSourceLimit> _limit;
+ long long _docsAddedToBatches; // for _limit enforcement
+
+ const std::string _ns;
+ std::unique_ptr<PlanExecutor> _exec;
+ BSONObjSet _outputSorts;
+ std::string _planSummary;
+ PlanSummaryStats _planSummaryStats;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 55fd916e02c..7b8cd82de0f 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source_tee_consumer.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/tee_buffer.h"
#include "mongo/db/pipeline/value.h"
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 9c8cbce17e0..72d79fd56fe 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -39,6 +39,9 @@
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index 1dd78b75a09..8c4a2ca3ff5 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -30,9 +30,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/util/log.h"
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
new file mode 100644
index 00000000000..c38c311a4e5
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/field_path.h"
+
+namespace mongo {
+
+class DocumentSourceGeoNear : public DocumentSourceNeedsMongod, public SplittableDocumentSource {
+public:
+ static const long long kDefaultLimit;
+
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ /**
+ * Attempts to combine with a subsequent limit stage, setting the internal limit field
+ * as a result.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ bool isValidInitialSource() const final {
+ return true;
+ }
+ Value serialize(bool explain = false) const final;
+ BSONObjSet getOutputSorts() final {
+ return SimpleBSONObjComparator::kInstance.makeBSONObjSet(
+ {BSON(distanceField->fullPath() << -1)});
+ }
+
+ // Virtuals for SplittableDocumentSource
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx);
+
+ static char geoNearName[];
+
+ long long getLimit() {
+ return limit;
+ }
+
+ BSONObj getQuery() const {
+ return query;
+ };
+
+ // this should only be used for testing
+ static boost::intrusive_ptr<DocumentSourceGeoNear> create(
+ const boost::intrusive_ptr<ExpressionContext>& pCtx);
+
+private:
+ explicit DocumentSourceGeoNear(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ void parseOptions(BSONObj options);
+ BSONObj buildGeoNearCmd() const;
+ void runCommand();
+
+ // These fields describe the command to run.
+ // coords and distanceField are required, rest are optional
+ BSONObj coords; // "near" option, but near is a reserved keyword on windows
+ bool coordsIsArray;
+ std::unique_ptr<FieldPath> distanceField; // Using unique_ptr because FieldPath can't be empty
+ long long limit;
+ double maxDistance;
+ double minDistance;
+ BSONObj query;
+ bool spherical;
+ double distanceMultiplier;
+ std::unique_ptr<FieldPath> includeLocs;
+
+ // these fields are used while processing the results
+ BSONObj cmdOutput;
+ std::unique_ptr<BSONObjIterator> resultsIterator; // iterator over cmdOutput["results"]
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_geo_near_test.cpp b/src/mongo/db/pipeline/document_source_geo_near_test.cpp
index 2e4ef356114..858cd274f65 100644
--- a/src/mongo/db/pipeline/document_source_geo_near_test.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near_test.cpp
@@ -33,7 +33,8 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_geo_near.h"
+#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/pipeline.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 7b6d6c0b22a..90475dfefcb 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "document_source.h"
+#include "mongo/db/pipeline/document_source_graph_lookup.h"
#include "mongo/base/init.h"
#include "mongo/db/bson/dotted_path_support.h"
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
new file mode 100644
index 00000000000..01473bf44fc
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -0,0 +1,219 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/lookup_set_cache.h"
+#include "mongo/db/pipeline/value_comparator.h"
+
+namespace mongo {
+
+class DocumentSourceGraphLookUp final : public DocumentSourceNeedsMongod {
+public:
+ static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse(
+ const AggregationRequest& request, const BSONElement& spec);
+
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ void dispose() final;
+ BSONObjSet getOutputSorts() final;
+ void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
+
+ /**
+ * Returns the 'as' path, and possibly the fields modified by an absorbed $unwind.
+ */
+ GetModPathsReturn getModifiedPaths() const final;
+
+ bool canSwapWithMatch() const final {
+ return true;
+ }
+
+ /**
+ * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final {
+ _startWith->addDependencies(deps);
+ return SEE_NEXT;
+ };
+
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
+ void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
+ collections->push_back(_from);
+ }
+
+ void doDetachFromOperationContext() final;
+
+ void doReattachToOperationContext(OperationContext* opCtx) final;
+
+ static boost::intrusive_ptr<DocumentSourceGraphLookUp> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ NamespaceString fromNs,
+ std::string asField,
+ std::string connectFromField,
+ std::string connectToField,
+ boost::intrusive_ptr<Expression> startWith,
+ boost::optional<BSONObj> additionalFilter,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth,
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc);
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+protected:
+ void doInjectExpressionContext() final;
+
+private:
+ DocumentSourceGraphLookUp(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ NamespaceString from,
+ std::string as,
+ std::string connectFromField,
+ std::string connectToField,
+ boost::intrusive_ptr<Expression> startWith,
+ boost::optional<BSONObj> additionalFilter,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth,
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> unwindSrc);
+
+ Value serialize(bool explain = false) const final {
+ // Should not be called; use serializeToArray instead.
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * Prepares the query to execute on the 'from' collection wrapped in a $match by using the
+ * contents of '_frontier'.
+ *
+ * Fills 'cached' with any values that were retrieved from the cache.
+ *
+ * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache.
+ * Otherwise, returns a query object.
+ */
+ boost::optional<BSONObj> makeMatchStageFromFrontier(BSONObjSet* cached);
+
+ /**
+ * If we have internalized a $unwind, getNext() dispatches to this function.
+ */
+ GetNextResult getNextUnwound();
+
+ /**
+ * Perform a breadth-first search of the 'from' collection. '_frontier' should already be
+ * populated with the values for the initial query. Populates '_discovered' with the result(s)
+ * of the query.
+ */
+ void doBreadthFirstSearch();
+
+ /**
+ * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a
+ * breadth-first search. Caller should check that _input is not boost::none.
+ */
+ void performSearch();
+
+ /**
+ * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying
+ * for 'queried'.
+ */
+ void addToCache(const BSONObj& result, const ValueUnorderedSet& queried);
+
+ /**
+ * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then
+ * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'.
+ */
+ void checkMemoryUsage();
+
+ /**
+ * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier'
+ * with the object's 'connectTo' values.
+ *
+ * Returns whether '_visited' was updated, and thus, whether the search should recurse.
+ */
+ bool addToVisitedAndFrontier(BSONObj result, long long depth);
+
+ // $graphLookup options.
+ NamespaceString _from;
+ FieldPath _as;
+ FieldPath _connectFromField;
+ FieldPath _connectToField;
+ boost::intrusive_ptr<Expression> _startWith;
+ boost::optional<BSONObj> _additionalFilter;
+ boost::optional<FieldPath> _depthField;
+ boost::optional<long long> _maxDepth;
+
+ // The ExpressionContext used when performing aggregation pipelines against the '_from'
+ // namespace.
+ boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
+
+ // The aggregation pipeline to perform against the '_from' namespace.
+ std::vector<BSONObj> _fromPipeline;
+
+ size_t _maxMemoryUsageBytes = 100 * 1024 * 1024;
+
+ // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'.
+ size_t _visitedUsageBytes = 0;
+ size_t _frontierUsageBytes = 0;
+
+ // Only used during the breadth-first search, tracks the set of values on the current frontier.
+ // We use boost::optional to defer initialization until the ExpressionContext containing the
+ // correct comparator is injected.
+ boost::optional<ValueUnorderedSet> _frontier;
+
+ // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the
+ // document from the foreign collection, value is the document itself. The keys are compared
+ // using the simple collation.
+ ValueUnorderedMap<BSONObj> _visited;
+
+ // Caches query results to avoid repeating any work. This structure is maintained across calls
+ // to getNext().
+ LookupSetCache _cache;
+
+ // When we have internalized a $unwind, we must keep track of the input document, since we will
+ // need it for multiple "getNext()" calls.
+ boost::optional<Document> _input;
+
+ // The variables that are in scope to be used by the '_startWith' expression.
+ std::unique_ptr<Variables> _variables;
+
+ // Keep track of a $unwind that was absorbed into this stage.
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind;
+
+ // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that
+ // field, tracking how many results we've returned so far for the current input document.
+ long long _outputIndex;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index ac25e48936e..2229332e859 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -28,13 +28,13 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
-
#include <algorithm>
#include <deque>
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_graph_lookup.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/stub_mongod_interface.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 31012cdc476..cf91c8eff5f 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -28,12 +28,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
-
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
new file mode 100644
index 00000000000..76d9282ebb9
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -0,0 +1,197 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/sorter/sorter.h"
+
+namespace mongo {
+
+class DocumentSourceGroup final : public DocumentSource, public SplittableDocumentSource {
+public:
+ using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>;
+ using GroupsMap = ValueUnorderedMap<Accumulators>;
+
+ static const size_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
+
+ // Virtuals from DocumentSource.
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ Value serialize(bool explain = false) const final;
+ GetNextResult getNext() final;
+ void dispose() final;
+ const char* getSourceName() const final;
+ BSONObjSet getOutputSorts() final;
+
+ /**
+ * Convenience method for creating a new $group stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceGroup> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& groupByExpression,
+ std::vector<AccumulationStatement> accumulationStatements,
+ Variables::Id numVariables,
+ size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
+
+ /**
+ * Parses 'elem' into a $group stage, or throws a UserException if 'elem' was an invalid
+ * specification.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * Add an accumulator, which will become a field in each Document that results from grouping.
+ */
+ void addAccumulator(AccumulationStatement accumulationStatement);
+
+ /**
+ * Sets the expression to use to determine the group id of each document.
+ */
+ void setIdExpression(const boost::intrusive_ptr<Expression> idExpression);
+
+ /**
+ * Tell this source if it is doing a merge from shards. Defaults to false.
+ */
+ void setDoingMerge(bool doingMerge) {
+ _doingMerge = doingMerge;
+ }
+
+ bool isStreaming() const {
+ return _streaming;
+ }
+
+ // Virtuals for SplittableDocumentSource.
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final;
+
+protected:
+ void doInjectExpressionContext() final;
+
+private:
+ explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ size_t maxMemoryUsageBytes = kDefaultMaxMemoryUsageBytes);
+
+ /**
+ * getNext() dispatches to one of these three depending on what type of $group it is. All three
+ * of these methods expect '_currentAccumulators' to have been reset before being called, and
+ * also expect initialize() to have been called already.
+ */
+ GetNextResult getNextStreaming();
+ GetNextResult getNextSpilled();
+ GetNextResult getNextStandard();
+
+ /**
+ * Attempt to identify an input sort order that allows us to turn into a streaming $group. If we
+ * find one, return it. Otherwise, return boost::none.
+ */
+ boost::optional<BSONObj> findRelevantInputSort() const;
+
+ /**
+ * Before returning anything, this source must prepare itself. In a streaming $group,
+ * initialize() requests the first document from the previous source, and uses it to prepare the
+ * accumulators. In an unsorted $group, initialize() exhausts the previous source before
+ * returning. The '_initialized' boolean indicates that initialize() has finished.
+ *
+ * This method may not be able to finish initialization in a single call if 'pSource' returns a
+ * DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult
+ * encountered, which may be either kEOF or kPauseExecution.
+ */
+ GetNextResult initialize();
+
+ /**
+ * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group
+ * does not exhaust the previous stage before returning, and thus does not maintain as large a
+ * store of documents at any one time, only an unsorted group can spill to disk.
+ */
+ std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
+
+ Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
+
+ /**
+ * Computes the internal representation of the group key.
+ */
+ Value computeId(Variables* vars);
+
+ /**
+ * Converts the internal representation of the group key to the _id shape specified by the
+ * user.
+ */
+ Value expandId(const Value& val);
+
+ /**
+ * 'vFieldName' contains the field names for the result documents, 'vpAccumulatorFactory'
+ * contains the accumulator factories for the result documents, and 'vpExpression' contains the
+ * common expressions used by each instance of each accumulator in order to find the right-hand
+ * side of what gets added to the accumulator. These three vectors parallel each other.
+ */
+ std::vector<std::string> vFieldName;
+ std::vector<Accumulator::Factory> vpAccumulatorFactory;
+ std::vector<boost::intrusive_ptr<Expression>> vpExpression;
+
+ bool _doingMerge;
+ size_t _memoryUsageBytes = 0;
+ size_t _maxMemoryUsageBytes;
+ std::unique_ptr<Variables> _variables;
+ std::vector<std::string> _idFieldNames; // used when id is a document
+ std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
+
+ BSONObj _inputSort;
+ bool _streaming;
+ bool _initialized;
+
+ Value _currentId;
+ Accumulators _currentAccumulators;
+
+ // We use boost::optional to defer initialization until the ExpressionContext containing the
+ // correct comparator is injected, since the groups must be built using the comparator's
+ // definition of equality.
+ boost::optional<GroupsMap> _groups;
+
+ std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles;
+ bool _spilled;
+
+ // Only used when '_spilled' is false.
+ GroupsMap::iterator groupsIterator;
+
+ // Only used when '_spilled' is true.
+ std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
+ const bool _extSortAllowed;
+
+ std::pair<Value, Value> _firstPartOfNextGroup;
+ // Only used when '_sorted' is true.
+ boost::optional<Document> _firstDocOfNextGroup;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 32b6d1f6f87..d0582101597 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -41,7 +41,8 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/dependencies.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
diff --git a/src/mongo/db/pipeline/document_source_index_stats.cpp b/src/mongo/db/pipeline/document_source_index_stats.cpp
index b6feff69eb7..3256f8e5211 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_index_stats.cpp
@@ -28,9 +28,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_index_stats.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/server_options.h"
+#include "mongo/util/net/sock.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
new file mode 100644
index 00000000000..1c1bc521aeb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/collection_index_usage_tracker.h"
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * Provides a document source interface to retrieve index statistics for a given namespace.
+ * Each document returned represents a single index and mongod instance.
+ */
+class DocumentSourceIndexStats final : public DocumentSourceNeedsMongod {
+public:
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+
+ virtual bool isValidInitialSource() const final {
+ return true;
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ CollectionIndexUsageMap _indexStatsMap;
+ CollectionIndexUsageMap::const_iterator _indexStatsIter;
+ std::string _processName;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index 75ca17eca48..634d635b22d 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/document.h"
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
new file mode 100644
index 00000000000..9c04213045d
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ BSONObjSet getOutputSorts() final {
+ return pSource ? pSource->getOutputSorts()
+ : SimpleBSONObjComparator::kInstance.makeBSONObjSet();
+ }
+
+ /**
+ * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ Value serialize(bool explain = false) const final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final {
+ return SEE_NEXT; // This doesn't affect needed fields
+ }
+
+ /**
+ Create a new limiting DocumentSource.
+
+ @param pExpCtx the expression context for the pipeline
+ @returns the DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSourceLimit> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
+
+ // Virtuals for SplittableDocumentSource
+ // Need to run on rounter. Running on shard as well is an optimization.
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return this;
+ }
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ long long getLimit() const {
+ return _limit;
+ }
+ void setLimit(long long newLimit) {
+ _limit = newLimit;
+ }
+
+ /**
+ Create a limiting DocumentSource from BSON.
+
+ This is a convenience method that uses the above, and operates on
+ a BSONElement that has been deteremined to be an Object with an
+ element named $limit.
+
+ @param pBsonElement the BSONELement that defines the limit
+ @param pExpCtx the expression context
+ @returns the grouping DocumentSource
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
+
+ long long _limit;
+ long long _nReturned = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp
index 2fc83eed374..6294a897ff2 100644
--- a/src/mongo/db/pipeline/document_source_limit_test.cpp
+++ b/src/mongo/db/pipeline/document_source_limit_test.cpp
@@ -32,7 +32,9 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 7a0727bcf45..5371542505c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "document_source.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/base/init.h"
#include "mongo/db/jsobj.h"
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
new file mode 100644
index 00000000000..13118828b65
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -0,0 +1,161 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/lookup_set_cache.h"
+#include "mongo/db/pipeline/value_comparator.h"
+
+namespace mongo {
+
+/**
+ * Queries separate collection for equality matches with documents in the pipeline collection.
+ * Adds matching documents to a new array field in the input document.
+ */
+class DocumentSourceLookUp final : public DocumentSourceNeedsMongod,
+ public SplittableDocumentSource {
+public:
+ static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse(
+ const AggregationRequest& request, const BSONElement& spec);
+
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
+
+ /**
+ * Returns the 'as' path, and possibly fields modified by an absorbed $unwind.
+ */
+ GetModPathsReturn getModifiedPaths() const final;
+
+ bool canSwapWithMatch() const final {
+ return true;
+ }
+
+ /**
+ * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwindSrc'
+ * field.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ void dispose() final;
+
+ BSONObjSet getOutputSorts() final {
+ return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
+ }
+
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
+ collections->push_back(_fromNs);
+ }
+
+ void doDetachFromOperationContext() final;
+
+ void doReattachToOperationContext(OperationContext* opCtx) final;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * Builds the BSONObj used to query the foreign collection and wraps it in a $match.
+ */
+ static BSONObj makeMatchStageFromInput(const Document& input,
+ const FieldPath& localFieldName,
+ const std::string& foreignFieldName,
+ const BSONObj& additionalFilter);
+
+ /**
+ * Helper to absorb an $unwind stage. Only used for testing this special behavior.
+ */
+ void setUnwindStage(const boost::intrusive_ptr<DocumentSourceUnwind>& unwind) {
+ invariant(!_handlingUnwind);
+ _unwindSrc = unwind;
+ _handlingUnwind = true;
+ }
+
+protected:
+ void doInjectExpressionContext() final;
+
+private:
+ DocumentSourceLookUp(NamespaceString fromNs,
+ std::string as,
+ std::string localField,
+ std::string foreignField,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ Value serialize(bool explain = false) const final {
+ // Should not be called; use serializeToArray instead.
+ MONGO_UNREACHABLE;
+ }
+
+ GetNextResult unwindResult();
+
+ NamespaceString _fromNs;
+ FieldPath _as;
+ FieldPath _localField;
+ FieldPath _foreignField;
+ std::string _foreignFieldFieldName;
+ boost::optional<BSONObj> _additionalFilter;
+
+ // The ExpressionContext used when performing aggregation pipelines against the '_fromNs'
+ // namespace.
+ boost::intrusive_ptr<ExpressionContext> _fromExpCtx;
+
+ // The aggregation pipeline to perform against the '_fromNs' namespace.
+ std::vector<BSONObj> _fromPipeline;
+
+ boost::intrusive_ptr<DocumentSourceMatch> _matchSrc;
+ boost::intrusive_ptr<DocumentSourceUnwind> _unwindSrc;
+
+ bool _handlingUnwind = false;
+ bool _handlingMatch = false;
+
+ // The following members are used to hold onto state across getNext() calls when
+ // '_handlingUnwind' is true.
+ long long _cursorIndex = 0;
+ boost::intrusive_ptr<Pipeline> _pipeline;
+ boost::optional<Document> _input;
+ boost::optional<Document> _nextValue;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 229b50906e9..cbf5b7838a2 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -37,7 +37,8 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/stub_mongod_interface.h"
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index d8df9116145..f58990a2669 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/expression_algo.h"
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
new file mode 100644
index 00000000000..4e2354c6d83
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -0,0 +1,162 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "mongo/client/connpool.h"
+#include "mongo/db/matcher/matcher.h"
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceMatch final : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+ BSONObjSet getOutputSorts() final {
+ return pSource ? pSource->getOutputSorts()
+ : SimpleBSONObjComparator::kInstance.makeBSONObjSet();
+ }
+
+ /**
+ * Attempts to combine with any subsequent $match stages, joining the query objects with a
+ * $and.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ void setSource(DocumentSource* Source) final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+
+ /**
+ * Convenience method for creating a $match stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceMatch> create(
+ BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
+ * Parses a $match stage from 'elem'.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pCtx);
+
+ /**
+ * Access the MatchExpression stored inside the DocumentSourceMatch. Does not release ownership.
+ */
+ MatchExpression* getMatchExpression() const {
+ return _expression.get();
+ }
+
+ /**
+ * Combines the filter in this $match with the filter of 'other' using a $and, updating this
+ * match in place.
+ */
+ void joinMatchWith(boost::intrusive_ptr<DocumentSourceMatch> other);
+
+ /**
+ * Returns the query in MatchExpression syntax.
+ */
+ BSONObj getQuery() const;
+
+ /** Returns the portion of the match that can safely be promoted to before a $redact.
+ * If this returns an empty BSONObj, no part of this match may safely be promoted.
+ *
+ * To be safe to promote, removing a field from a document to be matched must not cause
+ * that document to be accepted when it would otherwise be rejected. As an example,
+ * {name: {$ne: "bob smith"}} accepts documents without a name field, which means that
+ * running this filter before a redact that would remove the name field would leak
+ * information. On the other hand, {age: {$gt:5}} is ok because it doesn't accept documents
+ * that have had their age field removed.
+ */
+ BSONObj redactSafePortion() const;
+
+ static bool isTextQuery(const BSONObj& query);
+ bool isTextQuery() const {
+ return _isTextQuery;
+ }
+
+ /**
+ * Attempt to split this $match into two stages, where the first is not dependent upon any path
+ * from 'fields', and where applying them in sequence is equivalent to applying this stage once.
+ *
+ * Will return two intrusive_ptrs to new $match stages, where the first pointer is independent
+ * of 'fields', and the second is dependent. Either pointer may be null, so be sure to check the
+ * return value.
+ *
+ * For example, {$match: {a: "foo", "b.c": 4}} split by "b" will return pointers to two stages:
+ * {$match: {a: "foo"}}, and {$match: {"b.c": 4}}.
+ */
+ std::pair<boost::intrusive_ptr<DocumentSourceMatch>, boost::intrusive_ptr<DocumentSourceMatch>>
+ splitSourceBy(const std::set<std::string>& fields);
+
+ /**
+ * Given a document 'input', extract 'fields' and produce a BSONObj with those values.
+ */
+ static BSONObj getObjectForMatch(const Document& input, const std::set<std::string>& fields);
+
+ /**
+ * Should be called _only_ on a MatchExpression that is a predicate on 'path', or subfields of
+ * 'path'. It is also invalid to call this method on a $match including a $elemMatch on 'path',
+ * for example: {$match: {'path': {$elemMatch: {'subfield': 3}}}}
+ *
+ * Returns a new DocumentSourceMatch that, if executed on the subdocument at 'path', is
+ * equivalent to 'expression'.
+ *
+ * For example, if the original expression is {$and: [{'a.b': {$gt: 0}}, {'a.d': {$eq: 3}}]},
+ * the new $match will have the expression {$and: [{b: {$gt: 0}}, {d: {$eq: 3}}]} after
+ * descending on the path 'a'.
+ */
+ static boost::intrusive_ptr<DocumentSourceMatch> descendMatchOnPath(
+ MatchExpression* matchExpr,
+ const std::string& path,
+ boost::intrusive_ptr<ExpressionContext> expCtx);
+
+ void doInjectExpressionContext();
+
+private:
+ DocumentSourceMatch(const BSONObj& query,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ void addDependencies(DepsTracker* deps) const;
+
+ std::unique_ptr<MatchExpression> _expression;
+
+ // Cache the dependencies so that we know what fields we need to serialize to BSON for matching.
+ DepsTracker _dependencies;
+
+ BSONObj _predicate;
+ bool _isTextQuery;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp
index eb4d40bbc97..61cd89143cd 100644
--- a/src/mongo/db/pipeline/document_source_match_test.cpp
+++ b/src/mongo/db/pipeline/document_source_match_test.cpp
@@ -35,7 +35,8 @@
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 87a5700f206..866259c663e 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
new file mode 100644
index 00000000000..ad853a9f755
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/client/connpool.h"
+#include "mongo/db/clientcursor.h"
+#include "mongo/db/cursor_id.h"
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceMergeCursors : public DocumentSource {
+public:
+ struct CursorDescriptor {
+ CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId)
+ : connectionString(std::move(connectionString)),
+ ns(std::move(ns)),
+ cursorId(cursorId) {}
+
+ ConnectionString connectionString;
+ std::string ns;
+ CursorId cursorId;
+ };
+
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ void dispose() final;
+ Value serialize(bool explain = false) const final;
+ bool isValidInitialSource() const final {
+ return true;
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ static boost::intrusive_ptr<DocumentSource> create(
+ std::vector<CursorDescriptor> cursorDescriptors,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /** Returns non-owning pointers to cursors managed by this stage.
+ * Call this instead of getNext() if you want access to the raw streams.
+ * This method should only be called at most once.
+ */
+ std::vector<DBClientCursor*> getCursors();
+
+ /**
+ * Returns the next object from the cursor, throwing an appropriate exception if the cursor
+ * reported an error. This is a better form of DBClientCursor::nextSafe.
+ */
+ static Document nextSafeFrom(DBClientCursor* cursor);
+
+private:
+ struct CursorAndConnection {
+ CursorAndConnection(const CursorDescriptor& cursorDescriptor);
+ ScopedDbConnection connection;
+ DBClientCursor cursor;
+ };
+
+ // using list to enable removing arbitrary elements
+ typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors;
+
+ DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ // Converts _cursorDescriptors into active _cursors.
+ void start();
+
+ // This is the description of cursors to merge.
+ const std::vector<CursorDescriptor> _cursorDescriptors;
+
+ // These are the actual cursors we are merging. Created lazily.
+ Cursors _cursors;
+ Cursors::iterator _currentCursor;
+
+ bool _unstarted;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index 945aae771f7..686f4ef6f2e 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -28,8 +28,9 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
new file mode 100644
index 00000000000..7a600483661
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * Used in testing to store documents without using the storage layer. Methods are not marked as
+ * final in order to allow tests to intercept calls if needed.
+ */
+class DocumentSourceMock : public DocumentSource {
+public:
+ DocumentSourceMock(std::deque<GetNextResult> results);
+ DocumentSourceMock(std::deque<GetNextResult> results,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ GetNextResult getNext() override;
+ const char* getSourceName() const override;
+ Value serialize(bool explain = false) const override;
+ void dispose() override;
+ bool isValidInitialSource() const override {
+ return true;
+ }
+ BSONObjSet getOutputSorts() override {
+ return sorts;
+ }
+
+ static boost::intrusive_ptr<DocumentSourceMock> create();
+
+ static boost::intrusive_ptr<DocumentSourceMock> create(Document doc);
+
+ static boost::intrusive_ptr<DocumentSourceMock> create(const GetNextResult& result);
+ static boost::intrusive_ptr<DocumentSourceMock> create(std::deque<GetNextResult> results);
+
+ static boost::intrusive_ptr<DocumentSourceMock> create(const char* json);
+ static boost::intrusive_ptr<DocumentSourceMock> create(
+ const std::initializer_list<const char*>& jsons);
+
+ void reattachToOperationContext(OperationContext* opCtx) {
+ isDetachedFromOpCtx = false;
+ }
+
+ void detachFromOperationContext() {
+ isDetachedFromOpCtx = true;
+ }
+
+ boost::intrusive_ptr<DocumentSource> optimize() override {
+ isOptimized = true;
+ return this;
+ }
+
+ void doInjectExpressionContext() override {
+ isExpCtxInjected = true;
+ }
+
+ // Return documents from front of queue.
+ std::deque<GetNextResult> queue;
+
+ bool isDisposed = false;
+ bool isDetachedFromOpCtx = false;
+ bool isOptimized = false;
+ bool isExpCtxInjected = false;
+
+ BSONObjSet sorts;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_mock_test.cpp b/src/mongo/db/pipeline/document_source_mock_test.cpp
index e4826ff5b24..67942a61246 100644
--- a/src/mongo/db/pipeline/document_source_mock_test.cpp
+++ b/src/mongo/db/pipeline/document_source_mock_test.cpp
@@ -29,7 +29,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index be9f76d2207..7dab6365405 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/destructor_guard.h"
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
new file mode 100644
index 00000000000..4e8a678404a
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceOut final : public DocumentSourceNeedsMongod, public SplittableDocumentSource {
+public:
+ static std::unique_ptr<LiteParsedDocumentSourceOneForeignCollection> liteParse(
+ const AggregationRequest& request, const BSONElement& spec);
+
+ // virtuals from DocumentSource
+ ~DocumentSourceOut() final;
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
+ // Virtuals for SplittableDocumentSource
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return NULL;
+ }
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ const NamespaceString& getOutputNs() const {
+ return _outputNs;
+ }
+
+ /**
+ Create a document source for output and pass-through.
+
+ This can be put anywhere in a pipeline and will store content as
+ well as pass it on.
+
+ @param pBsonElement the raw BSON specification for the source
+ @param pExpCtx the expression context for the pipeline
+ @returns the newly created document source
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceOut(const NamespaceString& outputNs,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * Sets '_tempNs' to a unique temporary namespace, makes sure the output collection isn't
+ * sharded or capped, and saves the collection options and indexes of the target collection.
+ * Then creates the temporary collection we will insert into by copying the collection options
+ * and indexes from the target collection.
+ *
+ * Sets '_initialized' to true upon completion.
+ */
+ void initialize();
+
+ /**
+ * Inserts all of 'toInsert' into the temporary collection.
+ */
+ void spill(const std::vector<BSONObj>& toInsert);
+
+ bool _initialized = false;
+ bool _done = false;
+
+ // Holds on to the original collection options and index specs so we can check they didn't
+ // change during computation.
+ BSONObj _originalOutOptions;
+ std::list<BSONObj> _originalIndexes;
+
+ NamespaceString _tempNs; // output goes here as it is being processed.
+ const NamespaceString _outputNs; // output will go here after all data is processed.
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index 8ea144d0d70..fd44d534dde 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include <boost/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
diff --git a/src/mongo/db/pipeline/document_source_project.h b/src/mongo/db/pipeline/document_source_project.h
new file mode 100644
index 00000000000..869dce512b1
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_project.h
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+
+namespace mongo {
+
+/**
+ * The $project stage can be used for simple transformations such as including or excluding a set
+ * of fields, or can do more sophisticated things, like include some fields and add new "computed"
+ * fields, using the expression language. Note you can not mix an exclusion-style projection with
+ * adding or including any other fields.
+ */
+class DocumentSourceProject final {
+public:
+ /**
+ * Convenience method to create a $project stage from 'projectSpec'.
+ */
+ static boost::intrusive_ptr<DocumentSource> create(
+ BSONObj projectSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
+ * Parses a $project stage from the user-supplied BSON.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceProject() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_project_test.cpp b/src/mongo/db/pipeline/document_source_project_test.cpp
index 6dca42a98f3..1247de01579 100644
--- a/src/mongo/db/pipeline/document_source_project_test.cpp
+++ b/src/mongo/db/pipeline/document_source_project_test.cpp
@@ -36,7 +36,8 @@
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_redact.cpp b/src/mongo/db/pipeline/document_source_redact.cpp
index e5d8932b0b2..795cbd99113 100644
--- a/src/mongo/db/pipeline/document_source_redact.cpp
+++ b/src/mongo/db/pipeline/document_source_redact.cpp
@@ -28,12 +28,13 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_redact.h"
#include <boost/optional.hpp>
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/value.h"
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
new file mode 100644
index 00000000000..f056411f765
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -0,0 +1,70 @@
+
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression.h"
+
+namespace mongo {
+
+class DocumentSourceRedact final : public DocumentSource {
+public:
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+
+ /**
+ * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
+ * stage.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
+ void doInjectExpressionContext() final;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ Value serialize(bool explain = false) const final;
+
+private:
+ DocumentSourceRedact(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& previsit);
+
+ // These both work over _variables
+ boost::optional<Document> redactObject(); // redacts CURRENT
+ Value redactValue(const Value& in);
+
+ Variables::Id _currentId;
+ std::unique_ptr<Variables> _variables;
+ boost::intrusive_ptr<Expression> _expression;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_redact_test.cpp b/src/mongo/db/pipeline/document_source_redact_test.cpp
index a6a4bfd9f7b..461c25e3e83 100644
--- a/src/mongo/db/pipeline/document_source_redact_test.cpp
+++ b/src/mongo/db/pipeline/document_source_redact_test.cpp
@@ -32,7 +32,9 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_redact.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp
index 5625e43b1ab..ceef0e77061 100644
--- a/src/mongo/db/pipeline/document_source_replace_root.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
#include <boost/smart_ptr/intrusive_ptr.hpp>
diff --git a/src/mongo/db/pipeline/document_source_replace_root.h b/src/mongo/db/pipeline/document_source_replace_root.h
new file mode 100644
index 00000000000..f3313ec5f7a
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_replace_root.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+
+namespace mongo {
+
+/*
+ * $replaceRoot takes an object containing only an expression in the newRoot field, and replaces
+ * each incoming document with the result of evaluating that expression. Throws an error if the
+ * given expression is not an object or if the expression evaluates to the "missing" Value. This
+ * is implemented as an extension of DocumentSourceSingleDocumentTransformation.
+ */
+class DocumentSourceReplaceRoot final {
+public:
+ /**
+ * Creates a new replaceRoot DocumentSource from the BSON specification of the $replaceRoot
+ * stage.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceReplaceRoot() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_replace_root_test.cpp b/src/mongo/db/pipeline/document_source_replace_root_test.cpp
index 16e6c2f5fa0..a65c838dd14 100644
--- a/src/mongo/db/pipeline/document_source_replace_root_test.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root_test.cpp
@@ -35,7 +35,8 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 53c3040de7d..a436d3e8a53 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/client.h"
#include "mongo/db/pipeline/document.h"
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
new file mode 100644
index 00000000000..ed0eec9c180
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+
+namespace mongo {
+
+class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource {
+public:
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final {
+ return SEE_NEXT;
+ }
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final;
+
+ long long getSampleSize() const {
+ return _size;
+ }
+
+ void doInjectExpressionContext() final;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+private:
+ explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ long long _size;
+
+ // Uses a $sort stage to randomly sort the documents.
+ boost::intrusive_ptr<DocumentSourceSort> _sortStage;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
index 9cfa83075f3..f4b2299c9e8 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
#include <boost/math/distributions/beta.hpp>
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
new file mode 100644
index 00000000000..6e8a2ae1d39
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/value_comparator.h"
+
+namespace mongo {
+
+/**
+ * This class is not a registered stage, it is only used as an optimized replacement for $sample
+ * when the storage engine allows us to use a random cursor.
+ */
+class DocumentSourceSampleFromRandomCursor final : public DocumentSource {
+public:
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+
+ void doInjectExpressionContext() final;
+
+ static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ long long size,
+ std::string idField,
+ long long collectionSize);
+
+private:
+ DocumentSourceSampleFromRandomCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ long long size,
+ std::string idField,
+ long long collectionSize);
+
+ /**
+ * Keep asking for documents from the random cursor until it yields a new document. Errors if a
+ * a document is encountered without a value for '_idField', or if the random cursor keeps
+ * returning duplicate elements.
+ */
+ GetNextResult getNextNonDuplicateDocument();
+
+ long long _size;
+
+ // The field to use as the id of a document. Usually '_id', but 'ts' for the oplog.
+ std::string _idField;
+
+ // Keeps track of the documents that have been returned, since a random cursor is allowed to
+ // return duplicates. We use boost::optional to defer initialization until the ExpressionContext
+ // containing the correct comparator is injected.
+ boost::optional<ValueUnorderedSet> _seenDocs;
+
+ // The approximate number of documents in the collection (includes orphans).
+ const long long _nDocsInColl;
+
+ // The value to be assigned to the randMetaField of outcoming documents. Each call to getNext()
+ // will decrement this value by an amount scaled by _nDocsInColl as an attempt to appear as if
+ // the documents were produced by a top-k random sort.
+ double _randMetaFieldVal = 1.0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sample_test.cpp b/src/mongo/db/pipeline/document_source_sample_test.cpp
index 22d6a4e0f25..30a59baf22b 100644
--- a/src/mongo/db/pipeline/document_source_sample_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sample_test.cpp
@@ -35,7 +35,9 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sample.h"
+#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/service_context.h"
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index 953bcc81e53..67dd7338395 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -28,11 +28,13 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/value.h"
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
new file mode 100644
index 00000000000..b2db5b0c3ae
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * This class is for DocumentSources that take in and return one document at a time, in a 1:1
+ * transformation. It should only be used via an alias that passes the transformation logic through
+ * a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be
+ * created from BSON.
+ */
+class DocumentSourceSingleDocumentTransformation final : public DocumentSource {
+public:
+ /**
+ * This class defines the minimal interface that every parser wishing to take advantage of
+ * DocumentSourceSingleDocumentTransformation must implement.
+ *
+ * This interface ensures that DocumentSourceSingleDocumentTransformations are passed parsed
+ * objects that can execute the transformation and provide additional features like
+ * serialization and reporting and returning dependencies. The parser must also provide
+ * implementations for optimizing and adding the expression context, even if those functions do
+ * nothing.
+ */
+ class TransformerInterface {
+ public:
+ virtual ~TransformerInterface() = default;
+ virtual Document applyTransformation(Document input) = 0;
+ virtual void optimize() = 0;
+ virtual Document serialize(bool explain) const = 0;
+ virtual DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const = 0;
+ virtual void injectExpressionContext(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx) = 0;
+ virtual GetModPathsReturn getModifiedPaths() const = 0;
+ };
+
+ DocumentSourceSingleDocumentTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ std::unique_ptr<TransformerInterface> parsedTransform,
+ std::string name);
+
+ // virtuals from DocumentSource
+ const char* getSourceName() const final;
+ GetNextResult getNext() final;
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+ void dispose() final;
+ Value serialize(bool explain) const final;
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ void doInjectExpressionContext() final;
+ DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ GetModPathsReturn getModifiedPaths() const final;
+
+ bool canSwapWithMatch() const final {
+ return true;
+ }
+
+private:
+ // Stores transformation logic.
+ std::unique_ptr<TransformerInterface> _parsedTransform;
+
+ // Specific name of the transformation.
+ std::string _name;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index e052c24895d..27bb6d1d2a6 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -28,10 +28,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
new file mode 100644
index 00000000000..92d087e3c75
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource {
+public:
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ /**
+ * Attempts to move a subsequent $limit before the skip, potentially allowing for forther
+ * optimizations earlier in the pipeline.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ Value serialize(bool explain = false) const final;
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+ BSONObjSet getOutputSorts() final {
+ return pSource ? pSource->getOutputSorts()
+ : SimpleBSONObjComparator::kInstance.makeBSONObjSet();
+ }
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final {
+ return SEE_NEXT; // This doesn't affect needed fields
+ }
+
+ // Virtuals for SplittableDocumentSource
+ // Need to run on rounter. Can't run on shards.
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return NULL;
+ }
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ long long getSkip() const {
+ return _nToSkip;
+ }
+ void setSkip(long long newSkip) {
+ _nToSkip = newSkip;
+ }
+
+ /**
+ * Convenience method for creating a $skip stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceSkip> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip);
+
+ /**
+ * Parses the user-supplied BSON into a $skip stage.
+ *
+ * Throws a UserException if 'elem' is an invalid $skip specification.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ long long nToSkip);
+
+ long long _nToSkip = 0;
+ long long _nSkippedSoFar = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_skip_test.cpp b/src/mongo/db/pipeline/document_source_skip_test.cpp
index 175efad4788..4a5710bc5bc 100644
--- a/src/mongo/db/pipeline/document_source_skip_test.cpp
+++ b/src/mongo/db/pipeline/document_source_skip_test.cpp
@@ -30,7 +30,8 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 156b0499d20..4f4f17da4f2 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -28,10 +28,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
new file mode 100644
index 00000000000..d9266011a55
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -0,0 +1,197 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/sorter/sorter.h"
+
+namespace mongo {
+
+class Expression;
+
+class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource {
+public:
+ static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024;
+
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
+
+ GetModPathsReturn getModifiedPaths() const final {
+ // A $sort does not modify any paths.
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}};
+ }
+
+ bool canSwapWithMatch() const final {
+ return true;
+ }
+
+ BSONObjSet getOutputSorts() final {
+ return allPrefixes(_sort);
+ }
+
+ /**
+ * Attempts to absorb a subsequent $limit stage so that it an perform a top-k sort.
+ */
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+ void dispose() final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final;
+
+ /// Write out a Document whose contents are the sort key.
+ Document serializeSortKey(bool explain) const;
+
+ /**
+ * Parses a $sort stage from the user-supplied BSON.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
+ * Convenience method for creating a $sort stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceSort> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ BSONObj sortOrder,
+ long long limit = -1,
+ uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
+
+ /**
+ * Returns -1 for no limit.
+ */
+ long long getLimit() const;
+
+ /**
+ * Loads a document to be sorted. This can be used to sort a stream of documents that are not
+ * coming from another DocumentSource. Once all documents have been added, the caller must call
+ * loadingDone() before using getNext() to receive the documents in sorted order.
+ */
+ void loadDocument(const Document& doc);
+
+ /**
+ * Signals to the sort stage that there will be no more input documents. It is an error to call
+ * loadDocument() once this method returns.
+ */
+ void loadingDone();
+
+ /**
+ * Instructs the sort stage to use the given set of cursors as inputs, to merge documents that
+ * have already been sorted.
+ */
+ void populateFromCursors(const std::vector<DBClientCursor*>& cursors);
+
+ bool isPopulated() {
+ return _populated;
+ };
+
+ boost::intrusive_ptr<DocumentSourceLimit> getLimitSrc() const {
+ return limitSrc;
+ }
+
+private:
+ explicit DocumentSourceSort(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ Value serialize(bool explain = false) const final {
+ MONGO_UNREACHABLE; // Should call serializeToArray instead.
+ }
+
+ /**
+ * Helper to add a sort key to this stage.
+ */
+ void addKey(StringData fieldPath, bool ascending);
+
+ /**
+ * Before returning anything, we have to consume all input and sort it. This method consumes all
+ * input and prepares the sorted stream '_output'.
+ *
+ * This method may not be able to finish populating the sorter in a single call if 'pSource'
+ * returns a DocumentSource::GetNextResult::kPauseExecution, so it returns the last
+ * GetNextResult encountered, which may be either kEOF or kPauseExecution.
+ */
+ GetNextResult populate();
+ bool _populated = false;
+
+ BSONObj _sort;
+
+ SortOptions makeSortOptions() const;
+
+ // This is used to merge pre-sorted results from a DocumentSourceMergeCursors.
+ class IteratorFromCursor;
+
+ /* these two parallel each other */
+ typedef std::vector<boost::intrusive_ptr<Expression>> SortKey;
+ SortKey vSortKey;
+ std::vector<char> vAscending; // used like std::vector<bool> but without specialization
+
+ /// Extracts the fields in vSortKey from the Document;
+ Value extractKey(const Document& d) const;
+
+ /// Compare two Values according to the specified sort key.
+ int compare(const Value& lhs, const Value& rhs) const;
+
+ typedef Sorter<Value, Document> MySorter;
+
+ /**
+ * Absorbs 'limit', enabling a top-k sort. It is safe to call this multiple times, it will keep
+ * the smallest limit.
+ */
+ void setLimitSrc(boost::intrusive_ptr<DocumentSourceLimit> limit) {
+ if (!limitSrc || limit->getLimit() < limitSrc->getLimit()) {
+ limitSrc = limit;
+ }
+ }
+
+ // For MySorter
+ class Comparator {
+ public:
+ explicit Comparator(const DocumentSourceSort& source) : _source(source) {}
+ int operator()(const MySorter::Data& lhs, const MySorter::Data& rhs) const {
+ return _source.compare(lhs.first, rhs.first);
+ }
+
+ private:
+ const DocumentSourceSort& _source;
+ };
+
+ boost::intrusive_ptr<DocumentSourceLimit> limitSrc;
+
+ uint64_t _maxMemoryUsageBytes;
+ bool _done;
+ bool _mergingPresorted;
+ std::unique_ptr<MySorter> _sorter;
+ std::unique_ptr<MySorter::Iterator> _output;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.cpp b/src/mongo/db/pipeline/document_source_sort_by_count.cpp
index 021843b3e7f..48cfaa9f559 100644
--- a/src/mongo/db/pipeline/document_source_sort_by_count.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_by_count.cpp
@@ -28,9 +28,11 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_sort_by_count.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.h b/src/mongo/db/pipeline/document_source_sort_by_count.h
new file mode 100644
index 00000000000..2444d048259
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sort_by_count.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * The $sortByCount stage is an alias for a $group stage followed by a $sort stage.
+ */
+class DocumentSourceSortByCount final {
+public:
+ /**
+ * Returns a $group stage followed by a $sort stage.
+ */
+ static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceSortByCount() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp
index bed658e616a..46afa82a3a5 100644
--- a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp
@@ -36,7 +36,10 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_sort_by_count.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 022b2040ba3..52da88fd616 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -40,7 +40,8 @@
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/temp_dir.h"
diff --git a/src/mongo/db/pipeline/document_source_unwind.cpp b/src/mongo/db/pipeline/document_source_unwind.cpp
index 02b8dc62355..368b29afbd4 100644
--- a/src/mongo/db/pipeline/document_source_unwind.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/document.h"
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
new file mode 100644
index 00000000000..9a9c466d037
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/field_path.h"
+
+namespace mongo {
+
+class DocumentSourceUnwind final : public DocumentSource {
+public:
+ // virtuals from DocumentSource
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+ BSONObjSet getOutputSorts() final;
+
+ /**
+ * Returns the unwound path, and the 'includeArrayIndex' path, if specified.
+ */
+ GetModPathsReturn getModifiedPaths() const final;
+
+ bool canSwapWithMatch() const final {
+ return true;
+ }
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+
+ /**
+ * Creates a new $unwind DocumentSource from a BSON specification.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ static boost::intrusive_ptr<DocumentSourceUnwind> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const std::string& path,
+ bool includeNullIfEmptyOrMissing,
+ const boost::optional<std::string>& includeArrayIndex);
+
+ std::string getUnwindPath() const {
+ return _unwindPath.fullPath();
+ }
+
+ bool preserveNullAndEmptyArrays() const {
+ return _preserveNullAndEmptyArrays;
+ }
+
+ const boost::optional<FieldPath>& indexPath() const {
+ return _indexPath;
+ }
+
+private:
+ DocumentSourceUnwind(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const FieldPath& fieldPath,
+ bool includeNullIfEmptyOrMissing,
+ const boost::optional<FieldPath>& includeArrayIndex);
+
+ // Configuration state.
+ const FieldPath _unwindPath;
+ // Documents that have a nullish value, or an empty array for the field '_unwindPath', will pass
+ // through the $unwind stage unmodified if '_preserveNullAndEmptyArrays' is true.
+ const bool _preserveNullAndEmptyArrays;
+ // If set, the $unwind stage will include the array index in the specified path, overwriting any
+ // existing value, setting to null when the value was a non-array or empty array.
+ const boost::optional<FieldPath> _indexPath;
+
+ // Iteration state.
+ class Unwinder;
+ std::unique_ptr<Unwinder> _unwinder;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_unwind_test.cpp b/src/mongo/db/pipeline/document_source_unwind_test.cpp
index a782d7b521f..1e62f3bc9fc 100644
--- a/src/mongo/db/pipeline/document_source_unwind_test.cpp
+++ b/src/mongo/db/pipeline/document_source_unwind_test.cpp
@@ -39,7 +39,8 @@
#include "mongo/bson/json.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/value_comparator.h"
diff --git a/src/mongo/db/pipeline/parsed_aggregation_projection.h b/src/mongo/db/pipeline/parsed_aggregation_projection.h
index 6e8d9100845..a637365565e 100644
--- a/src/mongo/db/pipeline/parsed_aggregation_projection.h
+++ b/src/mongo/db/pipeline/parsed_aggregation_projection.h
@@ -34,7 +34,7 @@
#include <memory>
#include "mongo/bson/bsonelement.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/field_path.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index f003df39da9..b296d3f85df 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -40,6 +40,11 @@
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_geo_near.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/util/mongoutils/str.h"
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 2666ff1cccb..bb681edd3f1 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -49,6 +49,12 @@
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/db/pipeline/document_source_sample.h"
+#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 315b0b10cfd..7b4331d60b8 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/field_path.h"
diff --git a/src/mongo/db/pipeline/tee_buffer_test.cpp b/src/mongo/db/pipeline/tee_buffer_test.cpp
index 10a692922e0..0a8fc7d22a2 100644
--- a/src/mongo/db/pipeline/tee_buffer_test.cpp
+++ b/src/mongo/db/pipeline/tee_buffer_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/tee_buffer.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp
index 3d58a3126f2..379f0364839 100644
--- a/src/mongo/db/views/view_catalog.cpp
+++ b/src/mongo/db/views/view_catalog.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/views/resolved_view.h"
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index be0d83e3671..1c99ca82663 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index e1319cecf3f..f6d5e7e9ec4 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -44,7 +44,7 @@
#include "mongo/db/json.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
-#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/plan_executor.h"
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 2cd504c1af4..b58abbc9972 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregation_request.h"
+#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h
index 5efd0b449a1..969f62ec907 100644
--- a/src/mongo/s/commands/cluster_aggregate.h
+++ b/src/mongo/s/commands/cluster_aggregate.h
@@ -35,6 +35,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/config.h"