diff options
Diffstat (limited to 'kafka/metrics/stats/percentiles.py')
-rw-r--r-- | kafka/metrics/stats/percentiles.py | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py new file mode 100644 index 0000000..84e7160 --- /dev/null +++ b/kafka/metrics/stats/percentiles.py @@ -0,0 +1,72 @@ +from kafka.metrics import AnonMeasurable, NamedMeasurable +from kafka.metrics.compound_stat import AbstractCompoundStat +from kafka.metrics.stats import Histogram +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class BucketSizing(object): + CONSTANT = 0 + LINEAR = 1 + + +class Percentiles(AbstractSampledStat, AbstractCompoundStat): + """A compound stat that reports one or more percentiles""" + def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, + percentiles=None): + super(Percentiles, self).__init__(0.0) + self._percentiles = percentiles or [] + self._buckets = int(size_in_bytes / 4) + if bucketing == BucketSizing.CONSTANT: + self._bin_scheme = Histogram.ConstantBinScheme(self._buckets, + min_val, max_val) + elif bucketing == BucketSizing.LINEAR: + if min_val != 0.0: + raise ValueError('Linear bucket sizing requires min_val' + ' to be 0.0.') + self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) + else: + ValueError('Unknown bucket type: %s' % bucketing) + + def stats(self): + measurables = [] + + def make_measure_fn(pct): + return lambda config, now: self.value(config, now, + pct / 100.0) + + for percentile in self._percentiles: + measure_fn = make_measure_fn(percentile.percentile) + stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn)) + measurables.append(stat) + return measurables + + def value(self, config, now, quantile): + self.purge_obsolete_samples(config, now) + count = sum(sample.event_count for sample in self._samples) + if count == 0.0: + return float('NaN') + sum_val = 0.0 + quant = float(quantile) + for b in range(self._buckets): + for sample in self._samples: + assert type(sample) is self.HistogramSample + hist = sample.histogram.counts + sum_val += hist[b] + if sum_val / count > quant: + return self._bin_scheme.from_bin(b) + return float('inf') + + def combine(self, samples, config, now): + return self.value(config, now, 0.5) + + def new_sample(self, time_ms): + return Percentiles.HistogramSample(self._bin_scheme, time_ms) + + def update(self, sample, config, value, time_ms): + assert type(sample) is self.HistogramSample + sample.histogram.record(value) + + class HistogramSample(AbstractSampledStat.Sample): + def __init__(self, scheme, now): + super(Percentiles.HistogramSample, self).__init__(0.0, now) + self.histogram = Histogram(scheme) |