summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats/percentiles.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/metrics/stats/percentiles.py')
-rw-r--r--kafka/metrics/stats/percentiles.py72
1 files changed, 72 insertions, 0 deletions
diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py
new file mode 100644
index 0000000..84e7160
--- /dev/null
+++ b/kafka/metrics/stats/percentiles.py
@@ -0,0 +1,72 @@
+from kafka.metrics import AnonMeasurable, NamedMeasurable
+from kafka.metrics.compound_stat import AbstractCompoundStat
+from kafka.metrics.stats import Histogram
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class BucketSizing(object):
+ CONSTANT = 0
+ LINEAR = 1
+
+
+class Percentiles(AbstractSampledStat, AbstractCompoundStat):
+ """A compound stat that reports one or more percentiles"""
+ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
+ percentiles=None):
+ super(Percentiles, self).__init__(0.0)
+ self._percentiles = percentiles or []
+ self._buckets = int(size_in_bytes / 4)
+ if bucketing == BucketSizing.CONSTANT:
+ self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
+ min_val, max_val)
+ elif bucketing == BucketSizing.LINEAR:
+ if min_val != 0.0:
+ raise ValueError('Linear bucket sizing requires min_val'
+ ' to be 0.0.')
+ self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
+ else:
+ ValueError('Unknown bucket type: %s' % bucketing)
+
+ def stats(self):
+ measurables = []
+
+ def make_measure_fn(pct):
+ return lambda config, now: self.value(config, now,
+ pct / 100.0)
+
+ for percentile in self._percentiles:
+ measure_fn = make_measure_fn(percentile.percentile)
+ stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
+ measurables.append(stat)
+ return measurables
+
+ def value(self, config, now, quantile):
+ self.purge_obsolete_samples(config, now)
+ count = sum(sample.event_count for sample in self._samples)
+ if count == 0.0:
+ return float('NaN')
+ sum_val = 0.0
+ quant = float(quantile)
+ for b in range(self._buckets):
+ for sample in self._samples:
+ assert type(sample) is self.HistogramSample
+ hist = sample.histogram.counts
+ sum_val += hist[b]
+ if sum_val / count > quant:
+ return self._bin_scheme.from_bin(b)
+ return float('inf')
+
+ def combine(self, samples, config, now):
+ return self.value(config, now, 0.5)
+
+ def new_sample(self, time_ms):
+ return Percentiles.HistogramSample(self._bin_scheme, time_ms)
+
+ def update(self, sample, config, value, time_ms):
+ assert type(sample) is self.HistogramSample
+ sample.histogram.record(value)
+
+ class HistogramSample(AbstractSampledStat.Sample):
+ def __init__(self, scheme, now):
+ super(Percentiles.HistogramSample, self).__init__(0.0, now)
+ self.histogram = Histogram(scheme)