diff options
author | Ivan Fefer <ivan.fefer@mongodb.com> | 2022-11-18 10:48:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-18 11:17:40 +0000 |
commit | 4cfd9b936b68622274e39100b7859ea8eb089ad8 (patch) | |
tree | 1a2c3a89c9e6cf8fece70d2f0cc9b7a5c72e610e /src/mongo/db/pipeline/document_source_group.h | |
parent | 092b2eec8182bf540d73bd77649617ad2a36300d (diff) | |
download | mongo-4cfd9b936b68622274e39100b7859ea8eb089ad8.tar.gz |
SERVER-70267 Add DocumentSourceStreamingGroup
Diffstat (limited to 'src/mongo/db/pipeline/document_source_group.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 268 |
1 files changed, 19 insertions, 249 deletions
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 2a3ac2ea89c..a1f8e9b2b9a 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -32,90 +32,19 @@ #include <memory> #include <utility> -#include "mongo/db/pipeline/accumulation_statement.h" -#include "mongo/db/pipeline/accumulator.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/memory_usage_tracker.h" -#include "mongo/db/pipeline/transformer_interface.h" -#include "mongo/db/sorter/sorter.h" +#include "mongo/db/pipeline/document_source_group_base.h" namespace mongo { /** - * GroupFromFirstTransformation consists of a list of (field name, expression pairs). It returns a - * document synthesized by assigning each field name in the output document to the result of - * evaluating the corresponding expression. If the expression evaluates to missing, we assign a - * value of BSONNULL. This is necessary to match the semantics of $first for missing fields. + * This class represents hash based group implementation that stores all groups until source is + * depleted and only then starts outputing documents. */ -class GroupFromFirstDocumentTransformation final : public TransformerInterface { +class DocumentSourceGroup final : public DocumentSourceGroupBase { public: - GroupFromFirstDocumentTransformation( - const std::string& groupId, - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs) - : _accumulatorExprs(std::move(accumulatorExprs)), _groupId(groupId) {} - - TransformerType getType() const final { - return TransformerType::kGroupFromFirstDocument; - } - - /** - * The path of the field that we are grouping on: i.e., the field in the input document that we - * will use to create the _id field of the ouptut document. - */ - const std::string& groupId() const { - return _groupId; - } - - Document applyTransformation(const Document& input) final; - - void optimize() final; - - Document serializeTransformation( - boost::optional<ExplainOptions::Verbosity> explain) const final; - - DepsTracker::State addDependencies(DepsTracker* deps) const final; - - void addVariableRefs(std::set<Variables::Id>* refs) const final; - - DocumentSource::GetModPathsReturn getModifiedPaths() const final; - - static std::unique_ptr<GroupFromFirstDocumentTransformation> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const std::string& groupId, - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> accumulatorExprs); - -private: - std::vector<std::pair<std::string, boost::intrusive_ptr<Expression>>> _accumulatorExprs; - std::string _groupId; -}; - -class DocumentSourceGroup final : public DocumentSource { -public: - using Accumulators = std::vector<boost::intrusive_ptr<AccumulatorState>>; - using GroupsMap = ValueUnorderedMap<Accumulators>; - static constexpr StringData kStageName = "$group"_sd; - boost::intrusive_ptr<DocumentSource> optimize() final; - DepsTracker::State getDependencies(DepsTracker* deps) const final; - void addVariableRefs(std::set<Variables::Id>* refs) const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; const char* getSourceName() const final; - GetModPathsReturn getModifiedPaths() const final; - StringMap<boost::intrusive_ptr<Expression>> getIdFields() const; - - /** - * Can be used to change or swap out individual _id fields, but should not be used - * once execution has begun. - */ - std::vector<boost::intrusive_ptr<Expression>>& getMutableIdFields(); - const std::vector<AccumulationStatement>& getAccumulatedFields() const; - - /** - * Can be used to change or swap out individual accumulated fields, but should not be used - * once execution has begun. - */ - std::vector<AccumulationStatement>& getMutableAccumulatedFields(); /** * Convenience method for creating a new $group stage. If maxMemoryUsageBytes is boost::none, @@ -133,199 +62,40 @@ public: */ static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - StageConstraints constraints(StreamType::kBlocking, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed, - TransactionRequirement::kAllowed, - LookupRequirement::kAllowed, - UnionRequirement::kAllowed); - constraints.canSwapWithMatch = true; - return constraints; - } - - /** - * Add an accumulator, which will become a field in each Document that results from grouping. - */ - void addAccumulator(AccumulationStatement accumulationStatement); - - /** - * Sets the expression to use to determine the group id of each document. - */ - void setIdExpression(boost::intrusive_ptr<Expression> idExpression); - - /** - * Returns the expression to use to determine the group id of each document. - */ - boost::intrusive_ptr<Expression> getIdExpression() const; - - /** - * Returns true if this $group stage represents a 'global' $group which is merging together - * results from earlier partial groups. - */ - bool doingMerge() const { - return _doingMerge; - } - - /** - * Tell this source if it is doing a merge from shards. Defaults to false. - */ - void setDoingMerge(bool doingMerge) { - _doingMerge = doingMerge; - } - - /** - * Returns true if this $group stage used disk during execution and false otherwise. - */ - bool usedDisk() final { - return _stats.spills > 0; - } - - const SpecificStats* getSpecificStats() const final { - return &_stats; - } - - boost::optional<DistributedPlanLogic> distributedPlanLogic() final; - bool canRunInParallelBeforeWriteStage( - const OrderedPathSet& nameOfShardKeyFieldsUponEntryToStage) const final; - - /** - * When possible, creates a document transformer that transforms the first document in a group - * into one of the output documents of the $group stage. This is possible when we are grouping - * on a single field and all accumulators are $first (or there are no accumluators). - * - * It is sometimes possible to use a DISTINCT_SCAN to scan the first document of each group, - * in which case this transformation can replace the actual $group stage in the pipeline - * (SERVER-9507). - */ - std::unique_ptr<GroupFromFirstDocumentTransformation> rewriteGroupAsTransformOnFirstDocument() - const; - - /** - * Returns maximum allowed memory footprint. - */ - size_t getMaxMemoryUsageBytes() const; - - // True if this $group can be pushed down to SBE. - bool sbeCompatible() const { - return _sbeCompatible; - } + static boost::intrusive_ptr<DocumentSource> createFromBsonWithMaxMemoryUsage( + BSONElement elem, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<size_t> maxMemoryUsageBytes); protected: GetNextResult doGetNext() final; - void doDispose() final; + + bool isSpecFieldReserved(StringData) final { + return false; + } private: explicit DocumentSourceGroup(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<size_t> maxMemoryUsageBytes = boost::none); /** - * getNext() dispatches to one of these three depending on what type of $group it is. These - * methods expect '_currentAccumulators' to have been reset before being called, and also expect - * initialize() to have been called already. - */ - GetNextResult getNextSpilled(); - GetNextResult getNextStandard(); - - /** - * Before returning anything, this source must prepare itself. In a streaming $group, - * initialize() requests the first document from the previous source, and uses it to prepare the - * accumulators. In an unsorted $group, initialize() exhausts the previous source before - * returning. The '_initialized' boolean indicates that initialize() has finished. + * Before returning anything, this source must prepare itself. performBlockingGroup() exhausts + * the previous source before + * returning. The '_groupsReady' boolean indicates that performBlockingGroup() has finished. * * This method may not be able to finish initialization in a single call if 'pSource' returns a * DocumentSource::GetNextResult::kPauseExecution, so it returns the last GetNextResult * encountered, which may be either kEOF or kPauseExecution. */ - GetNextResult initialize(); + GetNextResult performBlockingGroup(); /** - * Initializes this $group after any children are potentially initialized see initialize() for + * Initializes this $group after any children are initialized. See performBlockingGroup() for * more details. */ - GetNextResult initializeSelf(GetNextResult input); - - /** - * Spill groups map to disk and returns an iterator to the file. Note: Since a sorted $group - * does not exhaust the previous stage before returning, and thus does not maintain as large a - * store of documents at any one time, only an unsorted group can spill to disk. - */ - std::shared_ptr<Sorter<Value, Value>::Iterator> spill(); - - /** - * If we ran out of memory, finish all the pending operations so that some memory - * can be freed. - */ - void freeMemory(); - - Document makeDocument(const Value& id, const Accumulators& accums, bool mergeableOutput); - - /** - * Computes the internal representation of the group key. - */ - Value computeId(const Document& root); - - /** - * Converts the internal representation of the group key to the _id shape specified by the - * user. - */ - Value expandId(const Value& val); - - /** - * Returns true if 'dottedPath' is one of the group keys present in '_idExpressions'. - */ - bool pathIncludedInGroupKeys(const std::string& dottedPath) const; - - /** - * Cleans up any pending memory usage. Throws error, if memory usage is above - * 'maxMemoryUsageBytes' and cannot spill to disk. - * - * Returns true, if the caller should spill to disk, false otherwise. - */ - bool shouldSpillWithAttemptToSaveMemory(); - - std::vector<AccumulationStatement> _accumulatedFields; - - bool _doingMerge; - - MemoryUsageTracker _memoryTracker; - - GroupStats _stats; - - std::shared_ptr<Sorter<Value, Value>::File> _file; - - // If the expression for the '_id' field represents a non-empty object, we track its fields' - // names in '_idFieldNames'. - std::vector<std::string> _idFieldNames; - // Expressions for the individual fields when '_id' produces a document in the order of - // '_idFieldNames' or the whole expression otherwise. - std::vector<boost::intrusive_ptr<Expression>> _idExpressions; - - bool _initialized; - - Value _currentId; - Accumulators _currentAccumulators; - - // We use boost::optional to defer initialization until the ExpressionContext containing the - // correct comparator is injected, since the groups must be built using the comparator's - // definition of equality. - boost::optional<GroupsMap> _groups; - - std::vector<std::shared_ptr<Sorter<Value, Value>::Iterator>> _sortedFiles; - bool _spilled; - - // Only used when '_spilled' is false. - GroupsMap::iterator groupsIterator; - - // Only used when '_spilled' is true. - std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; - - std::pair<Value, Value> _firstPartOfNextGroup; + GetNextResult performBlockingGroupSelf(GetNextResult input); - bool _sbeCompatible; + bool _groupsReady; }; } // namespace mongo |