/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/platform/basic.h" #include #include #include #include "mongo/base/init.h" #include "mongo/bson/bsontypes.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/summation.h" namespace mongo { class Accumulator : public RefCountable { public: using Factory = boost::intrusive_ptr (*)( const boost::intrusive_ptr& expCtx); Accumulator(const boost::intrusive_ptr& expCtx) : _expCtx(expCtx) {} /** Process input and update internal state. * merging should be true when processing outputs from getValue(true). */ void process(const Value& input, bool merging) { processInternal(input, merging); } /** Marks the end of the evaluate() phase and return accumulated result. * toBeMerged should be true when the outputs will be merged by process(). */ virtual Value getValue(bool toBeMerged) = 0; /// The name of the op as used in a serialization of the pipeline. virtual const char* getOpName() const = 0; int memUsageForSorter() const { dassert(_memUsageBytes != 0); // This would mean subclass didn't set it return _memUsageBytes; } /// Reset this accumulator to a fresh state ready to receive input. virtual void reset() = 0; virtual bool isAssociative() const { return false; } virtual bool isCommutative() const { return false; } protected: /// Update subclass's internal state based on input virtual void processInternal(const Value& input, bool merging) = 0; const boost::intrusive_ptr& getExpressionContext() const { return _expCtx; } /// subclasses are expected to update this as necessary int _memUsageBytes = 0; private: boost::intrusive_ptr _expCtx; }; class AccumulatorAddToSet final : public Accumulator { public: explicit AccumulatorAddToSet(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); bool isAssociative() const final { return true; } bool isCommutative() const final { return true; } private: ValueUnorderedSet _set; }; class AccumulatorFirst final : public Accumulator { public: explicit AccumulatorFirst(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: bool _haveFirst; Value _first; }; class AccumulatorLast final : public Accumulator { public: explicit AccumulatorLast(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: Value _last; }; class AccumulatorSum final : public Accumulator { public: explicit AccumulatorSum(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); bool isAssociative() const final { return true; } bool isCommutative() const final { return true; } private: BSONType totalType = NumberInt; DoubleDoubleSummation nonDecimalTotal; Decimal128 decimalTotal; }; class AccumulatorMinMax : public Accumulator { public: enum Sense : int { MIN = 1, MAX = -1, // Used to "scale" comparison. }; AccumulatorMinMax(const boost::intrusive_ptr& expCtx, Sense sense); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; bool isAssociative() const final { return true; } bool isCommutative() const final { return true; } private: Value _val; const Sense _sense; }; class AccumulatorMax final : public AccumulatorMinMax { public: explicit AccumulatorMax(const boost::intrusive_ptr& expCtx) : AccumulatorMinMax(expCtx, MAX) {} static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); }; class AccumulatorMin final : public AccumulatorMinMax { public: explicit AccumulatorMin(const boost::intrusive_ptr& expCtx) : AccumulatorMinMax(expCtx, MIN) {} static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); }; class AccumulatorPush final : public Accumulator { public: explicit AccumulatorPush(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: std::vector vpValue; }; class AccumulatorAvg final : public Accumulator { public: explicit AccumulatorAvg(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: /** * The total of all values is partitioned between those that are decimals, and those that are * not decimals, so the decimal total needs to add the non-decimal. */ Decimal128 _getDecimalTotal() const; bool _isDecimal; DoubleDoubleSummation _nonDecimalTotal; Decimal128 _decimalTotal; long long _count; }; class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr& expCtx, bool isSamp); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; private: const bool _isSamp; long long _count; double _mean; double _m2; // Running sum of squares of delta from mean. Named to match algorithm. }; class AccumulatorStdDevPop final : public AccumulatorStdDev { public: explicit AccumulatorStdDevPop(const boost::intrusive_ptr& expCtx) : AccumulatorStdDev(expCtx, false) {} static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); }; class AccumulatorStdDevSamp final : public AccumulatorStdDev { public: explicit AccumulatorStdDevSamp(const boost::intrusive_ptr& expCtx) : AccumulatorStdDev(expCtx, true) {} static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); }; class AccumulatorMergeObjects : public Accumulator { public: AccumulatorMergeObjects(const boost::intrusive_ptr& expCtx); void processInternal(const Value& input, bool merging) final; Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: MutableDocument _output; }; }