summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats/rate.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/metrics/stats/rate.py')
-rw-r--r--kafka/metrics/stats/rate.py115
1 files changed, 115 insertions, 0 deletions
diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py
new file mode 100644
index 0000000..3ce2e74
--- /dev/null
+++ b/kafka/metrics/stats/rate.py
@@ -0,0 +1,115 @@
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class TimeUnit(object):
+ _names = {
+ 'nanosecond': 0,
+ 'microsecond': 1,
+ 'millisecond': 2,
+ 'second': 3,
+ 'minute': 4,
+ 'hour': 5,
+ 'day': 6,
+ }
+
+ NANOSECONDS = _names['nanosecond']
+ MICROSECONDS = _names['microsecond']
+ MILLISECONDS = _names['millisecond']
+ SECONDS = _names['second']
+ MINUTES = _names['minute']
+ HOURS = _names['hour']
+ DAYS = _names['day']
+
+ @staticmethod
+ def get_name(time_unit):
+ return TimeUnit._names[time_unit]
+
+
+class Rate(AbstractMeasurableStat):
+ """
+ The rate of the given quantity. By default this is the total observed
+ over a set of samples from a sampled statistic divided by the elapsed
+ time over the sample windows. Alternative AbstractSampledStat
+ implementations can be provided, however, to record the rate of
+ occurrences (e.g. the count of values measured over the time interval)
+ or other such values.
+ """
+ def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None):
+ self._stat = sampled_stat or SampledTotal()
+ self._unit = time_unit
+
+ def unit_name(self):
+ return TimeUnit.get_name(self._unit)
+
+ def record(self, config, value, time_ms):
+ self._stat.record(config, value, time_ms)
+
+ def measure(self, config, now):
+ value = self._stat.measure(config, now)
+ return float(value) / self.convert(self.window_size(config, now))
+
+ def window_size(self, config, now):
+ # purge old samples before we compute the window size
+ self._stat.purge_obsolete_samples(config, now)
+
+ """
+ Here we check the total amount of time elapsed since the oldest
+ non-obsolete window. This give the total window_size of the batch
+ which is the time used for Rate computation. However, there is
+ an issue if we do not have sufficient data for e.g. if only
+ 1 second has elapsed in a 30 second window, the measured rate
+ will be very high. Hence we assume that the elapsed time is
+ always N-1 complete windows plus whatever fraction of the final
+ window is complete.
+
+ Note that we could simply count the amount of time elapsed in
+ the current window and add n-1 windows to get the total time,
+ but this approach does not account for sleeps. AbstractSampledStat
+ only creates samples whenever record is called, if no record is
+ called for a period of time that time is not accounted for in
+ window_size and produces incorrect results.
+ """
+ total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms
+ # Check how many full windows of data we have currently retained
+ num_full_windows = int(total_elapsed_time_ms / config.time_window_ms)
+ min_full_windows = config.samples - 1
+
+ # If the available windows are less than the minimum required,
+ # add the difference to the totalElapsedTime
+ if num_full_windows < min_full_windows:
+ total_elapsed_time_ms += ((min_full_windows - num_full_windows) *
+ config.time_window_ms)
+
+ return total_elapsed_time_ms
+
+ def convert(self, time_ms):
+ if self._unit == TimeUnit.NANOSECONDS:
+ return time_ms * 1000.0 * 1000.0
+ elif self._unit == TimeUnit.MICROSECONDS:
+ return time_ms * 1000.0
+ elif self._unit == TimeUnit.MILLISECONDS:
+ return time_ms
+ elif self._unit == TimeUnit.SECONDS:
+ return time_ms / 1000.0
+ elif self._unit == TimeUnit.MINUTES:
+ return time_ms / (60.0 * 1000.0)
+ elif self._unit == TimeUnit.HOURS:
+ return time_ms / (60.0 * 60.0 * 1000.0)
+ elif self._unit == TimeUnit.DAYS:
+ return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
+ else:
+ raise ValueError('Unknown unit: %s' % self._unit)
+
+
+class SampledTotal(AbstractSampledStat):
+ def __init__(self, initial_value=None):
+ if initial_value is not None:
+ raise ValueError('initial_value cannot be set on SampledTotal')
+ super(SampledTotal, self).__init__(0.0)
+
+ def update(self, sample, config, value, time_ms):
+ sample.value += value
+
+ def combine(self, samples, config, now):
+ return float(sum(sample.value for sample in samples))