summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_group.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.h')
-rw-r--r--src/mongo/db/pipeline/document_source_group.h268
1 files changed, 19 insertions, 249 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 2a3ac2ea89c..a1f8e9b2b9a 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -32,90 +32,19 @@
#include <memory>
#include <utility>
-#include "mongo/db/pipeline/accumulation_statement.h"
-#include "mongo/db/pipeline/accumulator.h"
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/memory_usage_tracker.h"
-#include "mongo/db/pipeline/transformer_interface.h"
-#include "mongo/db/sorter/sorter.h"
+#include "mongo/db/pipeline/document_source_group_base.h"
namespace mongo {
/**
- * GroupFromFirstTransformation consists of a list of (field name, expression pairs). It returns a
- * document synthesized by assigning each field name in the output document to the result of
- * evaluating the corresponding expression. If the expression evaluates to missing, we assign a
- * value of BSONNULL. This is necessary to match the semantics of $first for missing fields.
+ * This class represents hash based group implementation that stores all groups until source is
+ * depleted and only then starts outputing documents.
*/
-class GroupFromFirstDocumentTransformation final : public TransformerInterface {
+class DocumentSourceGroup final : public DocumentSourceGroupBase {
public:
- GroupFromFirstDocumentTransformation(
- const std::string& groupId,
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs)
- : _accumulatorExprs(std::move(accumulatorExprs)), _groupId(groupId) {}
-
- TransformerType getType() const final {
- return TransformerType::kGroupFromFirstDocument;
- }
-
- /**
- * The path of the field that we are grouping on: i.e., the field in the input document that we
- * will use to create the _id field of the ouptut document.
- */
- const std::string& groupId() const {
- return _groupId;
- }
-
- Document applyTransformation(const Document& input) final;
-
- void optimize() final;
-
- Document serializeTransformation(
- boost::optional<ExplainOptions::Verbosity> explain) const final;
-
- DepsTracker::State addDependencies(DepsTracker* deps) const final;
-
- void addVariableRefs(std::set<Variables::Id>* refs) const final;
-
- DocumentSource::GetModPathsReturn getModifiedPaths() const final;
-
- static std::unique_ptr<GroupFromFirstDocumentTransformation> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const std::string& groupId,
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs);
-
-private:
- std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> _accumulatorExprs;
- std::string _groupId;
-};
-
-class DocumentSourceGroup final : public DocumentSource {
-public:
- using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>;
- using GroupsMap = ValueUnorderedMap<Accumulators>;
-
static constexpr StringData kStageName = "$group"_sd;
- boost::intrusive_ptr<DocumentSource> optimize() final;
- DepsTracker::State getDependencies(DepsTracker* deps) const final;
- void addVariableRefs(std::set<Variables::Id>* refs) const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
const char* getSourceName() const final;
- GetModPathsReturn getModifiedPaths() const final;
- StringMap<boost::intrusive_ptr<Expression>> getIdFields() const;
-
- /**
- * Can be used to change or swap out individual _id fields, but should not be used
- * once execution has begun.
- */
- std::vector<boost::intrusive_ptr<Expression>>& getMutableIdFields();
- const std::vector<AccumulationStatement>& getAccumulatedFields() const;
-
- /**
- * Can be used to change or swap out individual accumulated fields, but should not be used
- * once execution has begun.
- */
- std::vector<AccumulationStatement>& getMutableAccumulatedFields();
/**
* Convenience method for creating a new $group stage. If maxMemoryUsageBytes is boost::none,
@@ -133,199 +62,40 @@ public:
*/
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- StageConstraints constraints(StreamType::kBlocking,
- PositionRequirement::kNone,
- HostTypeRequirement::kNone,
- DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed,
- TransactionRequirement::kAllowed,
- LookupRequirement::kAllowed,
- UnionRequirement::kAllowed);
- constraints.canSwapWithMatch = true;
- return constraints;
- }
-
- /**
- * Add an accumulator, which will become a field in each Document that results from grouping.
- */
- void addAccumulator(AccumulationStatement accumulationStatement);
-
- /**
- * Sets the expression to use to determine the group id of each document.
- */
- void setIdExpression(boost::intrusive_ptr<Expression> idExpression);
-
- /**
- * Returns the expression to use to determine the group id of each document.
- */
- boost::intrusive_ptr<Expression> getIdExpression() const;
-
- /**
- * Returns true if this $group stage represents a 'global' $group which is merging together
- * results from earlier partial groups.
- */
- bool doingMerge() const {
- return _doingMerge;
- }
-
- /**
- * Tell this source if it is doing a merge from shards. Defaults to false.
- */
- void setDoingMerge(bool doingMerge) {
- _doingMerge = doingMerge;
- }
-
- /**
- * Returns true if this $group stage used disk during execution and false otherwise.
- */
- bool usedDisk() final {
- return _stats.spills > 0;
- }
-
- const SpecificStats* getSpecificStats() const final {
- return &_stats;
- }
-
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final;
- bool canRunInParallelBeforeWriteStage(
- const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const final;
-
- /**
- * When possible, creates a document transformer that transforms the first document in a group
- * into one of the output documents of the $group stage. This is possible when we are grouping
- * on a single field and all accumulators are $first (or there are no accumluators).
- *
- * It is sometimes possible to use a DISTINCT_SCAN to scan the first document of each group,
- * in which case this transformation can replace the actual $group stage in the pipeline
- * (SERVER-9507).
- */
- std::unique_ptr<GroupFromFirstDocumentTransformation> rewriteGroupAsTransformOnFirstDocument()
- const;
-
- /**
- * Returns maximum allowed memory footprint.
- */
- size_t getMaxMemoryUsageBytes() const;
-
- // True if this $group can be pushed down to SBE.
- bool sbeCompatible() const {
- return _sbeCompatible;
- }
+ static boost::intrusive_ptr<DocumentSource> createFromBsonWithMaxMemoryUsage(
+ BSONElement elem,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<size_t> maxMemoryUsageBytes);
protected:
GetNextResult doGetNext() final;
- void doDispose() final;
+
+ bool isSpecFieldReserved(StringData) final {
+ return false;
+ }
private:
explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<size_t> maxMemoryUsageBytes = boost::none);
/**
- * getNext() dispatches to one of these three depending on what type of $group it is. These
- * methods expect '_currentAccumulators' to have been reset before being called, and also expect
- * initialize() to have been called already.
- */
- GetNextResult getNextSpilled();
- GetNextResult getNextStandard();
-
- /**
- * Before returning anything, this source must prepare itself. In a streaming $group,
- * initialize() requests the first document from the previous source, and uses it to prepare the
- * accumulators. In an unsorted $group, initialize() exhausts the previous source before
- * returning. The '_initialized' boolean indicates that initialize() has finished.
+ * Before returning anything, this source must prepare itself. performBlockingGroup() exhausts
+ * the previous source before
+ * returning. The '_groupsReady' boolean indicates that performBlockingGroup() has finished.
*
* This method may not be able to finish initialization in a single call if 'pSource' returns a
* DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult
* encountered, which may be either kEOF or kPauseExecution.
*/
- GetNextResult initialize();
+ GetNextResult performBlockingGroup();
/**
- * Initializes this $group after any children are potentially initialized see initialize() for
+ * Initializes this $group after any children are initialized. See performBlockingGroup() for
* more details.
*/
- GetNextResult initializeSelf(GetNextResult input);
-
- /**
- * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group
- * does not exhaust the previous stage before returning, and thus does not maintain as large a
- * store of documents at any one time, only an unsorted group can spill to disk.
- */
- std::shared_ptr<Sorter<Value, Value>::Iterator> spill();
-
- /**
- * If we ran out of memory, finish all the pending operations so that some memory
- * can be freed.
- */
- void freeMemory();
-
- Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput);
-
- /**
- * Computes the internal representation of the group key.
- */
- Value computeId(const Document& root);
-
- /**
- * Converts the internal representation of the group key to the _id shape specified by the
- * user.
- */
- Value expandId(const Value& val);
-
- /**
- * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'.
- */
- bool pathIncludedInGroupKeys(const std::string& dottedPath) const;
-
- /**
- * Cleans up any pending memory usage. Throws error, if memory usage is above
- * 'maxMemoryUsageBytes' and cannot spill to disk.
- *
- * Returns true, if the caller should spill to disk, false otherwise.
- */
- bool shouldSpillWithAttemptToSaveMemory();
-
- std::vector<AccumulationStatement> _accumulatedFields;
-
- bool _doingMerge;
-
- MemoryUsageTracker _memoryTracker;
-
- GroupStats _stats;
-
- std::shared_ptr<Sorter<Value, Value>::File> _file;
-
- // If the expression for the '_id' field represents a non-empty object, we track its fields'
- // names in '_idFieldNames'.
- std::vector<std::string> _idFieldNames;
- // Expressions for the individual fields when '_id' produces a document in the order of
- // '_idFieldNames' or the whole expression otherwise.
- std::vector<boost::intrusive_ptr<Expression>> _idExpressions;
-
- bool _initialized;
-
- Value _currentId;
- Accumulators _currentAccumulators;
-
- // We use boost::optional to defer initialization until the ExpressionContext containing the
- // correct comparator is injected, since the groups must be built using the comparator's
- // definition of equality.
- boost::optional<GroupsMap> _groups;
-
- std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles;
- bool _spilled;
-
- // Only used when '_spilled' is false.
- GroupsMap::iterator groupsIterator;
-
- // Only used when '_spilled' is true.
- std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
-
- std::pair<Value, Value> _firstPartOfNextGroup;
+ GetNextResult performBlockingGroupSelf(GetNextResult input);
- bool _sbeCompatible;
+ bool _groupsReady;
};
} // namespace mongo