/**
* Copyright (c) 2011 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
#pragma once
#include "mongo/pch.h"
#include
#include "mongo/bson/bsontypes.h"
#include "mongo/db/pipeline/value.h"
namespace mongo {
class Accumulator : public RefCountable {
public:
/** Process input and update internal state.
* merging should be true when processing outputs from getValue(true).
*/
void process(const Value& input, bool merging) {
processInternal(input, merging);
}
/** Marks the end of the evaluate() phase and return accumulated result.
* toBeMerged should be true when the outputs will be merged by process().
*/
virtual Value getValue(bool toBeMerged) const = 0;
/// The name of the op as used in a serialization of the pipeline.
virtual const char* getOpName() const = 0;
int memUsageForSorter() const {
dassert(_memUsageBytes != 0); // This would mean subclass didn't set it
return _memUsageBytes;
}
/// Reset this accumulator to a fresh state ready to receive input.
virtual void reset() = 0;
protected:
Accumulator() : _memUsageBytes(0) {}
/// Update subclass's internal state based on input
virtual void processInternal(const Value& input, bool merging) = 0;
/// subclasses are expected to update this as necessary
int _memUsageBytes;
};
class AccumulatorAddToSet : public Accumulator {
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr create();
private:
AccumulatorAddToSet();
typedef boost::unordered_set SetType;
SetType set;
};
class AccumulatorFirst : public Accumulator {
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr create();
private:
AccumulatorFirst();
bool _haveFirst;
Value _first;
};
class AccumulatorLast : public Accumulator {
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr create();
private:
AccumulatorLast();
Value _last;
};
class AccumulatorSum : public Accumulator {
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr create();
protected: /* reused by AccumulatorAvg */
AccumulatorSum();
BSONType totalType;
long long longTotal;
double doubleTotal;
// count is only used by AccumulatorAvg, but lives here to avoid counting non-numeric values
long long count;
};
class AccumulatorMinMax : public Accumulator {
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr createMin();
static intrusive_ptr createMax();
private:
AccumulatorMinMax(int theSense);
Value _val;
const int _sense; /* 1 for min, -1 for max; used to "scale" comparison */
};
class AccumulatorPush : public Accumulator {
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr create();
private:
AccumulatorPush();
vector vpValue;
};
class AccumulatorAvg : public AccumulatorSum {
typedef AccumulatorSum Super;
public:
virtual void processInternal(const Value& input, bool merging);
virtual Value getValue(bool toBeMerged) const;
virtual const char* getOpName() const;
virtual void reset();
static intrusive_ptr create();
private:
AccumulatorAvg();
};
}