#pragma once #include #include #include "mongo/db/pipeline/document_source_group_base.h" namespace mongo { /** * This class represents streaming group implementation that can only be used when at least one of * _id fields is monotonic. It stores and output groups in batches. All groups in the batch has * the same value of monotonic id fields. * * For example, if the inputs are sorted by "x", we could use a batched streaming algorithm to * perform the grouping for {$group: {_id: {x: "$x", y: "$y"}}}. * * Groups are processes in batches. One batch corresponds to a set of groups when each monotonic * id field have the same value. Non-monotonic fields can have different values, so we still may * have multiple groups and even spill to disk, but we still consume significanty less memory * than general hash based group. * When a document with a different value in at least one group id field is encountered, it is * cached in '_firstDocumentOfNextBatch', current groups are finalized and returned in * subsequent getNext() called and when the current batch is depleted, memory is freeed and the * process starts again. * * TODO SERVER-71437 Implement an optimization for a special case where all group fields are * monotonic * - we don't need any hashing in this case. */ class DocumentSourceStreamingGroup final : public DocumentSourceGroupBase { public: static constexpr StringData kStageName = "$_internalStreamingGroup"_sd; const char* getSourceName() const final; /** * Convenience method for creating a new $_internalStreamingGroup stage. If maxMemoryUsageBytes * is boost::none, then it will actually use the value of * internalDocumentSourceGroupMaxMemoryBytes. */ static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx, const boost::intrusive_ptr& groupByExpression, std::vector monotonicExpressionIndexes, std::vector accumulationStatements, boost::optional maxMemoryUsageBytes = boost::none); /** * Parses 'elem' into a $_internalStreamingGroup stage, or throws a AssertionException if 'elem' * was an invalid specification. */ static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& expCtx); static boost::intrusive_ptr createFromBsonWithMaxMemoryUsage( BSONElement elem, const boost::intrusive_ptr& expCtx, boost::optional maxMemoryUsageBytes); protected: GetNextResult doGetNext() final; bool isSpecFieldReserved(StringData fieldName) final; void serializeAdditionalFields(MutableDocument& out, SerializationOptions opts = SerializationOptions()) const final; private: static constexpr StringData kMonotonicIdFieldsSpecField = "$monotonicIdFields"_sd; explicit DocumentSourceStreamingGroup( const boost::intrusive_ptr& expCtx, boost::optional maxMemoryUsageBytes = boost::none); GetNextResult getNextDocument(); GetNextResult readyNextBatch(); /** * Readies next batch after all children are initialized. See readyNextBatch() for * more details. */ GetNextResult readyNextBatchInner(GetNextResult input); bool isBatchFinished(const Value& id); template bool checkForBatchEndAndUpdateLastIdValues(const IdValueGetter& idValueGetter); std::vector _monotonicExpressionIndexes; std::vector _lastMonotonicIdFieldValues; boost::optional _firstDocumentOfNextBatch; bool _sourceDepleted; }; } // namespace mongo